[GitHub] [beam] iemejia commented on pull request #12056: [BEAM-9999] Add Apex runner deprecation note.
iemejia commented on pull request #12056: URL: https://github.com/apache/beam/pull/12056#issuecomment-647959367 Thanks @tysonjh ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia merged pull request #12056: [BEAM-9999] Add Apex runner deprecation note.
iemejia merged pull request #12056: URL: https://github.com/apache/beam/pull/12056 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK
purbanow commented on a change in pull request #11794: URL: https://github.com/apache/beam/pull/11794#discussion_r444031088 ## File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java ## @@ -447,6 +513,346 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + /** Implementation of {@link #write()}. */ + @AutoValue + public abstract static class Write extends PTransform, PDone> { +@Nullable +abstract SerializableFunction getDataSourceProviderFn(); + +@Nullable +abstract String getTable(); + +@Nullable +abstract String getStorageIntegrationName(); + +@Nullable +abstract String getStagingBucketName(); + +@Nullable +abstract String getQuery(); + +@Nullable +abstract String getFileNameTemplate(); + +@Nullable +abstract WriteDisposition getWriteDisposition(); + +@Nullable +abstract UserDataMapper getUserDataMapper(); + +@Nullable +abstract SnowflakeService getSnowflakeService(); + +abstract Builder toBuilder(); + +@AutoValue.Builder +abstract static class Builder { + abstract Builder setDataSourceProviderFn( + SerializableFunction dataSourceProviderFn); + + abstract Builder setTable(String table); + + abstract Builder setStorageIntegrationName(String storageIntegrationName); + + abstract Builder setStagingBucketName(String stagingBucketName); + + abstract Builder setQuery(String query); + + abstract Builder setFileNameTemplate(String fileNameTemplate); + + abstract Builder setUserDataMapper(UserDataMapper userDataMapper); + + abstract Builder setWriteDisposition(WriteDisposition writeDisposition); + + abstract Builder setSnowflakeService(SnowflakeService snowflakeService); + + abstract Write build(); +} + +/** + * Setting information about Snowflake server. + * + * @param config - An instance of {@link DataSourceConfiguration}. + */ +public Write withDataSourceConfiguration(final DataSourceConfiguration config) { + return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config)); +} + +/** + * Setting function that will provide {@link DataSourceConfiguration} in runtime. + * + * @param dataSourceProviderFn a {@link SerializableFunction}. + */ +public Write withDataSourceProviderFn( +SerializableFunction dataSourceProviderFn) { + return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build(); +} + +/** + * A table name to be written in Snowflake. + * + * @param table - String with the name of the table. + */ +public Write withTable(String table) { + return toBuilder().setTable(table).build(); +} + +/** + * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement. + * + * @param stagingBucketName - String with the name of the bucket. + */ +public Write withStagingBucketName(String stagingBucketName) { + return toBuilder().setStagingBucketName(stagingBucketName).build(); +} + +/** + * Name of the Storage Integration in Snowflake to be used. See + * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for + * reference. + * + * @param integrationName - String with the name of the Storage Integration. + */ +public Write withStorageIntegrationName(String integrationName) { + return toBuilder().setStorageIntegrationName(integrationName).build(); +} + +/** + * A query to be executed in Snowflake. + * + * @param query - String with query. + */ +public Write withQueryTransformation(String query) { + return toBuilder().setQuery(query).build(); +} + +/** + * A template name for files saved to GCP. + * + * @param fileNameTemplate - String with template name for files. + */ +public Write withFileNameTemplate(String fileNameTemplate) { + return toBuilder().setFileNameTemplate(fileNameTemplate).build(); +} + +/** + * User-defined function mapping user data into CSV lines. + * + * @param userDataMapper - an instance of {@link UserDataMapper}. + */ +public Write withUserDataMapper(UserDataMapper userDataMapper) { + return toBuilder().setUserDataMapper(userDataMapper).build(); +} + +/** + * A disposition to be used during writing to table phase. + * + * @param writeDisposition - an instance of {@link WriteDisposition}. + */ +public Write withWriteDisposition(WriteDisposition writeDisposition) { + return toBuilder().setWriteDisposition(writeDisposition).build(); +} + +/** + * A snowflake service which is supposed to be used. Note: Currently we have {@link + * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing. + * + * @p
[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk
mxm commented on pull request #11916: URL: https://github.com/apache/beam/pull/11916#issuecomment-647981883 Thanks for the stacktrace, that helped to figure out what's going on here. The issue is only present in batch mode where Flink does not use its own memory backend but uses Beam's `InMemoryBagUserStateFactory`. We have to adapt the implementation to return the same cache token for all `InMemorySingleKeyBagState`, see: https://github.com/apache/beam/blob/63d51f5c4f89a4c881243d9d43be2ac138b1254b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java#L60 https://github.com/apache/beam/blob/63d51f5c4f89a4c881243d9d43be2ac138b1254b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java#L116 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] purbanow commented on pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK
purbanow commented on pull request #11794: URL: https://github.com/apache/beam/pull/11794#issuecomment-647995341 @RyanSkraba Thanks a lot for your CR. We're going to add your CSV concern to our feature improvement list. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] purbanow commented on a change in pull request #11794: [BEAM-9894] Add batch SnowflakeIO.Write to Java SDK
purbanow commented on a change in pull request #11794: URL: https://github.com/apache/beam/pull/11794#discussion_r444055571 ## File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java ## @@ -447,6 +513,346 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + /** Implementation of {@link #write()}. */ + @AutoValue + public abstract static class Write extends PTransform, PDone> { +@Nullable +abstract SerializableFunction getDataSourceProviderFn(); + +@Nullable +abstract String getTable(); + +@Nullable +abstract String getStorageIntegrationName(); + +@Nullable +abstract String getStagingBucketName(); + +@Nullable +abstract String getQuery(); + +@Nullable +abstract String getFileNameTemplate(); + +@Nullable +abstract WriteDisposition getWriteDisposition(); + +@Nullable +abstract UserDataMapper getUserDataMapper(); + +@Nullable +abstract SnowflakeService getSnowflakeService(); + +abstract Builder toBuilder(); + +@AutoValue.Builder +abstract static class Builder { + abstract Builder setDataSourceProviderFn( + SerializableFunction dataSourceProviderFn); + + abstract Builder setTable(String table); + + abstract Builder setStorageIntegrationName(String storageIntegrationName); + + abstract Builder setStagingBucketName(String stagingBucketName); + + abstract Builder setQuery(String query); + + abstract Builder setFileNameTemplate(String fileNameTemplate); + + abstract Builder setUserDataMapper(UserDataMapper userDataMapper); + + abstract Builder setWriteDisposition(WriteDisposition writeDisposition); + + abstract Builder setSnowflakeService(SnowflakeService snowflakeService); + + abstract Write build(); +} + +/** + * Setting information about Snowflake server. + * + * @param config - An instance of {@link DataSourceConfiguration}. + */ +public Write withDataSourceConfiguration(final DataSourceConfiguration config) { + return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config)); +} + +/** + * Setting function that will provide {@link DataSourceConfiguration} in runtime. + * + * @param dataSourceProviderFn a {@link SerializableFunction}. + */ +public Write withDataSourceProviderFn( +SerializableFunction dataSourceProviderFn) { + return toBuilder().setDataSourceProviderFn(dataSourceProviderFn).build(); +} + +/** + * A table name to be written in Snowflake. + * + * @param table - String with the name of the table. + */ +public Write withTable(String table) { + return toBuilder().setTable(table).build(); +} + +/** + * Name of the cloud bucket (GCS by now) to use as tmp location of CSVs during COPY statement. + * + * @param stagingBucketName - String with the name of the bucket. + */ +public Write withStagingBucketName(String stagingBucketName) { + return toBuilder().setStagingBucketName(stagingBucketName).build(); +} + +/** + * Name of the Storage Integration in Snowflake to be used. See + * https://docs.snowflake.com/en/sql-reference/sql/create-storage-integration.html for + * reference. + * + * @param integrationName - String with the name of the Storage Integration. + */ +public Write withStorageIntegrationName(String integrationName) { + return toBuilder().setStorageIntegrationName(integrationName).build(); +} + +/** + * A query to be executed in Snowflake. + * + * @param query - String with query. + */ +public Write withQueryTransformation(String query) { + return toBuilder().setQuery(query).build(); +} + +/** + * A template name for files saved to GCP. + * + * @param fileNameTemplate - String with template name for files. + */ +public Write withFileNameTemplate(String fileNameTemplate) { + return toBuilder().setFileNameTemplate(fileNameTemplate).build(); +} + +/** + * User-defined function mapping user data into CSV lines. + * + * @param userDataMapper - an instance of {@link UserDataMapper}. + */ +public Write withUserDataMapper(UserDataMapper userDataMapper) { + return toBuilder().setUserDataMapper(userDataMapper).build(); +} + +/** + * A disposition to be used during writing to table phase. + * + * @param writeDisposition - an instance of {@link WriteDisposition}. + */ +public Write withWriteDisposition(WriteDisposition writeDisposition) { + return toBuilder().setWriteDisposition(writeDisposition).build(); +} + +/** + * A snowflake service which is supposed to be used. Note: Currently we have {@link + * SnowflakeServiceImpl} with corresponding {@link FakeSnowflakeServiceImpl} used for testing. + * + * @p
[GitHub] [beam] mxm opened a new pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token
mxm opened a new pull request #12062: URL: https://github.com/apache/beam/pull/12062 When the state cache is enabled in the Python SDK, the batch mode of the Flink Runner currently only allows a single user state cell because a new cache token is generated for each state cell; the caching code in the Python SDK Harness only supports one cache token per user state handler. Theoretically multiple cache tokens would work but would just be adding to the payload. We should make sure to just send a single cache token in batch mode (which is already the case in streaming) Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[
[GitHub] [beam] mxm commented on pull request #11916: [BEAM-10189] Add ReadModifyWriteState user state to python sdk
mxm commented on pull request #11916: URL: https://github.com/apache/beam/pull/11916#issuecomment-647999424 I've created a PR which fixes the problem: https://github.com/apache/beam/pull/12062 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kamilwu commented on pull request #12038: [BEAM-5853] Trigger autodeploy on commit ID and image tag mismatch
kamilwu commented on pull request #12038: URL: https://github.com/apache/beam/pull/12038#issuecomment-648005962 Thanks @damgad This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kamilwu merged pull request #12038: [BEAM-5853] Trigger autodeploy on commit ID and image tag mismatch
kamilwu merged pull request #12038: URL: https://github.com/apache/beam/pull/12038 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidak09 opened a new pull request #12063: [BEAM-10294] using SparkMetricsContainerStepMap for readable metrics presentation in Spark history server UI
davidak09 opened a new pull request #12063: URL: https://github.com/apache/beam/pull/12063 Beam metrics in Spark history server are not readable because now they're rendered as protobuffer formatted as JSON where metric's value is stored in `bytes` field. Using already existing `SparkMetricsContainerStepMap` which overrides `toString()` method by calling `SparkBeamMetric.renderAll()` method. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://c
[GitHub] [beam] davidak09 commented on pull request #12063: [BEAM-10294] using SparkMetricsContainerStepMap for readable metrics presentation in Spark history server UI
davidak09 commented on pull request #12063: URL: https://github.com/apache/beam/pull/12063#issuecomment-648008709 R: @dmvk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidak09 commented on a change in pull request #12063: [BEAM-10294] using SparkMetricsContainerStepMap for readable metrics presentation in Spark history server UI
davidak09 commented on a change in pull request #12063: URL: https://github.com/apache/beam/pull/12063#discussion_r444071819 ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/MetricsAccumulator.java ## @@ -87,13 +86,13 @@ public static MetricsContainerStepMapAccumulator getInstance() { } } - private static Optional recoverValueFromCheckpoint( + private static Optional recoverValueFromCheckpoint( JavaSparkContext jsc, CheckpointDir checkpointDir) { try { Path beamCheckpointPath = checkpointDir.getBeamCheckpointDir(); checkpointFilePath = new Path(beamCheckpointPath, ACCUMULATOR_CHECKPOINT_FILENAME); fileSystem = checkpointFilePath.getFileSystem(jsc.hadoopConfiguration()); - MetricsContainerStepMap recoveredValue = + SparkMetricsContainerStepMap recoveredValue = Review comment: I'm not sure about this - whether it's backward compatible ## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java ## @@ -51,11 +51,12 @@ } for (MetricResult metricResult : metricQueryResults.getDistributions()) { DistributionResult result = metricResult.getAttempted(); - metrics.put(renderName(metricResult) + ".count", result.getCount()); - metrics.put(renderName(metricResult) + ".sum", result.getSum()); - metrics.put(renderName(metricResult) + ".min", result.getMin()); - metrics.put(renderName(metricResult) + ".max", result.getMax()); - metrics.put(renderName(metricResult) + ".mean", result.getMean()); + String name = renderName(metricResult); + metrics.put(name + ".count", result.getCount()); + metrics.put(name + ".sum", result.getSum()); + metrics.put(name + ".min", result.getMin()); + metrics.put(name + ".max", result.getMax()); + metrics.put(name + ".mean", result.getMean()); Review comment: I'd personally prefer single metric, possibly with `.distribution` suffix, which could include all 5 stats (count, sum, min, max, mean), it would definitely be more readable in Spark UI This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] JozoVilcek closed pull request #12037: [BEAM-10284] Add option to pass configuration into ParquetIO.Sink
JozoVilcek closed pull request #12037: URL: https://github.com/apache/beam/pull/12037 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] JozoVilcek opened a new pull request #12064: [BEAM-10284] Add option to pass configuration into ParquetIO.Sink
JozoVilcek opened a new pull request #12064: URL: https://github.com/apache/beam/pull/12064 Add option to pass configuration into ParquetIO.Sink Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)]
[GitHub] [beam] kamilwu commented on pull request #12048: [BEAM-10258] Fix Dataflow-based Jenkins jobs
kamilwu commented on pull request #12048: URL: https://github.com/apache/beam/pull/12048#issuecomment-648041139 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kamilwu commented on pull request #12048: [BEAM-10258] Fix Dataflow-based Jenkins jobs
kamilwu commented on pull request #12048: URL: https://github.com/apache/beam/pull/12048#issuecomment-648044828 Run PubsubIO Performance Test Python This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nielm commented on a change in pull request #12011: [BEAM-9269] Set Spanner commit deadline using GRPC api not interceptor.
nielm commented on a change in pull request #12011: URL: https://github.com/apache/beam/pull/12011#discussion_r444112445 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerAccessor.java ## @@ -63,23 +65,19 @@ static SpannerAccessor create(SpannerConfig spannerConfig) { ValueProvider commitDeadline = spannerConfig.getCommitDeadline(); if (commitDeadline != null && commitDeadline.get().getMillis() > 0) { - // In Spanner API version 1.21 or above, we can set the deadline / total Timeout on an API - // call using the following code: - // - // UnaryCallSettings.Builder commitSettings = - // builder.getSpannerStubSettingsBuilder().commitSettings(); - // RetrySettings.Builder commitRetrySettings = commitSettings.getRetrySettings().toBuilder() - // commitSettings.setRetrySettings( - // commitRetrySettings.setTotalTimeout( - // Duration.ofMillis(getCommitDeadlineMillis().get())) - // .build()); - // - // However, at time of this commit, the Spanner API is at only at v1.6.0, where the only - // method to set a deadline is with GRPC Interceptors, so we have to use that... - SpannerInterceptorProvider interceptorProvider = - SpannerInterceptorProvider.createDefault() - .with(new CommitDeadlineSettingInterceptor(commitDeadline.get())); - builder.setInterceptorProvider(interceptorProvider); + // Set the GRPC deadline on the Commit API call. + UnaryCallSettings.Builder commitSettings = + builder.getSpannerStubSettingsBuilder().commitSettings(); + RetrySettings.Builder commitRetrySettings = commitSettings.getRetrySettings().toBuilder(); + commitSettings.setRetrySettings( + commitRetrySettings Review comment: It can be done, but it does it silently in the background. This mechanism allows us to have a counter for the number of timeouts which are useful for diagnosing slowness... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm opened a new pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm opened a new pull request #12065: URL: https://github.com/apache/beam/pull/12065 There are currently no latency metrics in the load tests which makes it impossible to monitor latency regressions. Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648048351 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648055510 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648056258 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kamilwu commented on pull request #12048: [BEAM-10258] Fix Dataflow-based Jenkins jobs
kamilwu commented on pull request #12048: URL: https://github.com/apache/beam/pull/12048#issuecomment-648058018 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kamilwu commented on pull request #12048: [BEAM-10258] Fix Dataflow-based Jenkins jobs
kamilwu commented on pull request #12048: URL: https://github.com/apache/beam/pull/12048#issuecomment-648059459 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kamilwu commented on pull request #12048: [BEAM-10258] Fix Dataflow-based Jenkins jobs
kamilwu commented on pull request #12048: URL: https://github.com/apache/beam/pull/12048#issuecomment-648062312 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] JozoVilcek commented on pull request #12064: [BEAM-10284] Add option to pass configuration into ParquetIO.Sink
JozoVilcek commented on pull request #12064: URL: https://github.com/apache/beam/pull/12064#issuecomment-648085218 R: @iemejia This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648086665 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kamilwu commented on pull request #12048: [BEAM-10258] Fix Dataflow-based Jenkins jobs
kamilwu commented on pull request #12048: URL: https://github.com/apache/beam/pull/12048#issuecomment-648089617 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token
mxm commented on pull request #12062: URL: https://github.com/apache/beam/pull/12062#issuecomment-648094003 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648096370 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TobKed commented on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
TobKed commented on pull request #11877: URL: https://github.com/apache/beam/pull/11877#issuecomment-648097757 > @TobKed - please ping me if this is ready to be merged. @aaltay I think it is ready, however I have some questions before it can me merged: 1. Who can can and should setup secrets for GitHub Repository? I described necessary secret variables in PR description (**Before merging**) ([docs](https://help.github.com/en/actions/configuring-and-managing-workflows/creating-and-storing-encrypted-secrets)) 2. Who can can and should setup versioning and lifecycle management for GCS Bucket? 3. What do you think about not storing `GCP_BUCKET` as the secret but hardcoding it, as it is in `beam-wheels` repository? Staging bucket is publicly available and not using secret will allow to show full path to the files in the github actions logs. It will make easier to update/modify sh scripts for release process as well. I propose to use `beam-wheels-staging` bucket as it is in `beam-wheels`. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648117962 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648120617 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mwalenia merged pull request #12048: [BEAM-10258] Fix Dataflow-based Jenkins jobs
mwalenia merged pull request #12048: URL: https://github.com/apache/beam/pull/12048 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648148297 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648156395 Run Seed Job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648162367 Run Python Load Tests ParDo Flink Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] angoenka merged pull request #12044: [BEAM-9707] Remove hardcoded unified harness image
angoenka merged pull request #12044: URL: https://github.com/apache/beam/pull/12044 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12065: [BEAM-10306] Add latency measurements to Python Flink load test
mxm commented on pull request #12065: URL: https://github.com/apache/beam/pull/12065#issuecomment-648206600 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #12055: [BEAM-9272] Remove obsolete portable SDF urns
lukecwik commented on pull request #12055: URL: https://github.com/apache/beam/pull/12055#issuecomment-648243445 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token
lukecwik commented on a change in pull request #12062: URL: https://github.com/apache/beam/pull/12062#discussion_r444325702 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java ## @@ -41,9 +41,12 @@ public class InMemoryBagUserStateFactory Review comment: You should update the comment related to error handling to make sure that users discard the factory every time they want a new cache token. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iemejia commented on pull request #12064: [BEAM-10284] Add option to pass configuration into ParquetIO.Sink
iemejia commented on pull request #12064: URL: https://github.com/apache/beam/pull/12064#issuecomment-648252297 Hi, PR looks perfect but I have a question. Why do you need to pass a `Configuration` object or in other words what are you trying to configure? Something FileSystem related or Parquet related? Just with this in mind. We are trying to keep ParquetIO public API a bit 'isolated' of Hadoop API to make evolution 'easier' for not Hadoop cases (https://issues.apache.org/jira/browse/PARQUET-1126) I am even feeling inclined if what you want to achieve is Parquet configuration (`parquet.avro.add-list-element-records`, `parquet.avro.write-old-list-structure`) to pass those in a Map. But if what you want is FileSystem configuration I am wondering why the configuration on HDFS does not make it, did you try that if that's the case? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token
y1chi commented on pull request #12062: URL: https://github.com/apache/beam/pull/12062#issuecomment-648258540 IIUC, this changes all state handlers to use same cache token value, but doesn't reduce the number of user state cache token in process bundle request to 1? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] acrites commented on a change in pull request #11993: Change time granularity to seconds in ParDoTest TestStream timer test…
acrites commented on a change in pull request #11993: URL: https://github.com/apache/beam/pull/11993#discussion_r444339102 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3808,15 +3817,24 @@ public void onTimer( TestStream> stream = TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) // See GlobalWindow, - // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)) - .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))) + // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)), Review comment: Yichi and I played around with some flag settings in Dataflow and I think we can change the time resolution for tests to milliseconds, which should make this test pass. However, I'm a little worried in that we then aren't testing the same as prod, so we might end up missing some other bugs that come up. Otherwise, I don't really know a good way around this since `BoundedWindow.TIMESTAMP_MAX_VALUE` is not a round number of seconds. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11993: Change time granularity to seconds in ParDoTest TestStream timer test…
y1chi commented on a change in pull request #11993: URL: https://github.com/apache/beam/pull/11993#discussion_r444343155 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -3808,15 +3817,24 @@ public void onTimer( TestStream> stream = TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) // See GlobalWindow, - // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)) - .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))) + // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)), Review comment: @acrites and I chatted about some of the tests. There is an option to alter the windmill wm_work_set_watermark_resolution_usec and dataflow_watermark_resolution_usec so that the AdvanceWatermark and AdvanceProcessingTime can deal with milliseconds. But it requires additional test framework plumbing. Should we keep this test as it was and try to unsickbay it after we can set the windmill flag values? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12061: [BEAM-4634] Add options to allow BigQuery StandardSQL queries.
lostluck commented on a change in pull request #12061: URL: https://github.com/apache/beam/pull/12061#discussion_r444345197 ## File path: sdks/go/pkg/beam/io/bigqueryio/bigquery.go ## @@ -90,21 +90,27 @@ func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection // TODO(herohde) 7/13/2017: using * is probably too inefficient. We could infer // a focused query from the type. - return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t) + return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t, nil) +} + +// QueryOptions represents additional options for executing a query. +type QueryOptions struct { Review comment: I'm less familiar with Big Query and it's option set, but consider switching this to a variadic list of functional options ala https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis In particular, this avoids a breaking compile change when the (admitedly few) users of this function update their beam. It doesn't remove the need for a QueryOptions struct to serialize the options, but it does allow the callsite to not have empty options every time. I don't feel super strongly about this as it's likely this package will likely need updating when we fix module versioning, but it's a suggestion worth discussing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik merged pull request #12055: [BEAM-9272] Remove obsolete portable SDF urns
lukecwik merged pull request #12055: URL: https://github.com/apache/beam/pull/12055 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
lukecwik commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444354311 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Inst
[GitHub] [beam] aaltay commented on pull request #11877: [BEAM-10184] Build python wheels on GitHub Actions for Linux/MacOS
aaltay commented on pull request #11877: URL: https://github.com/apache/beam/pull/11877#issuecomment-648280381 > > @TobKed - please ping me if this is ready to be merged. > > @aaltay I think it is ready, however I have some questions before it can me merged: > > 1. Who can can and should setup secrets for GitHub Repository? I described necessary secret variables in PR description (**Before merging**) ([docs](https://help.github.com/en/actions/configuring-and-managing-workflows/creating-and-storing-encrypted-secrets)) I do not have access to the "Settings" tab. Could you work with @tysonjh and infra on this one? > 2. Who can can and should setup versioning and lifecycle management for GCS Bucket? I can give you access to apache-beam-testing project to update things. Or I can do it. Let me know. > 3. What do you think about not storing `GCP_BUCKET` as the secret but hardcoding it, as it is in `beam-wheels` repository? >Staging bucket is publicly available and not using secret will allow to show full path to the files in the github actions logs. >It will make easier to update/modify sh scripts for release process as well. >I propose to use `beam-wheels-staging` bucket as it is in `beam-wheels`. Sounds good to me. > > WDYT? @tysonjh could help. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
lukecwik commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444354311 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Inst
[GitHub] [beam] piotr-szuberski commented on a change in pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski commented on a change in pull request #12023: URL: https://github.com/apache/beam/pull/12023#discussion_r444378444 ## File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py ## @@ -0,0 +1,165 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import typing +import unittest + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam import coders +from apache_beam.io.external.jdbc import WriteToJdbc +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.testing.test_pipeline import TestPipeline + +# pylint: disable=wrong-import-order, wrong-import-position +try: + import psycopg2 +except ImportError: + unittest.skip("no psycopg2 installed") + # pylint: enable=wrong-import-order, wrong-import-position + +JdbcTestRow = typing.NamedTuple( +"JdbcTestRow", +[ +("f_id", int), +("f_real", float), +("f_string", unicode), +], +) + +coders.registry.register_coder(JdbcTestRow, coders.RowCoder) + +ROW_COUNT = 10 + + +class JdbcExternalTransformTest(unittest.TestCase): + """Tests that exercise the cross-language JdbcIO Transform (implemented in java). + + To run with the local expansion service and flink job server you need to build it, + e.g. via command: + ./gradlew :sdks:java:io:expansion-service:shadowJar + ./gradlew :runners:flink:1.10:job-server:shadowJar + and have flink1.10 master cluster running + + If on development branch, it may be necessary to build java sdk docker image and tag + it with the latest released version. + ./gradlew :sdks:java:container:docker + docker tag apache/beam_java_sdk:x.xx.x.dev apache/beam_java_sdk:y.yy.y.dev + + python setup.py nosetests \ + --tests=apache_beam.io.external.jdbc_test:JdbcExternalTransformTest.test_read_and_write \ + --test-pipeline-options=" +--driver_class_name=org.postgresql.Driver +--jdbc_url=jdbc:postgresql://localhost:5432/postgres Review comment: I used https://github.com/testcontainers/testcontainers-python and now postgres instance is instantiated automatically. Now the test can be run without any params This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on a change in pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski commented on a change in pull request #12023: URL: https://github.com/apache/beam/pull/12023#discussion_r444378882 ## File path: sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py ## @@ -0,0 +1,165 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from __future__ import absolute_import + +import logging +import typing +import unittest + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam import coders +from apache_beam.io.external.jdbc import WriteToJdbc +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.testing.test_pipeline import TestPipeline + +# pylint: disable=wrong-import-order, wrong-import-position +try: + import psycopg2 +except ImportError: + unittest.skip("no psycopg2 installed") + # pylint: enable=wrong-import-order, wrong-import-position + +JdbcTestRow = typing.NamedTuple( +"JdbcTestRow", +[ +("f_id", int), +("f_real", float), +("f_string", unicode), +], +) + +coders.registry.register_coder(JdbcTestRow, coders.RowCoder) + +ROW_COUNT = 10 + + +class JdbcExternalTransformTest(unittest.TestCase): + """Tests that exercise the cross-language JdbcIO Transform (implemented in java). + + To run with the local expansion service and flink job server you need to build it, + e.g. via command: + ./gradlew :sdks:java:io:expansion-service:shadowJar + ./gradlew :runners:flink:1.10:job-server:shadowJar + and have flink1.10 master cluster running + + If on development branch, it may be necessary to build java sdk docker image and tag + it with the latest released version. + ./gradlew :sdks:java:container:docker + docker tag apache/beam_java_sdk:x.xx.x.dev apache/beam_java_sdk:y.yy.y.dev Review comment: Ok, I rebased and now it uses 2.23.0 version. I think this info indeed is not that necessary This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aromanenko-dev commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
aromanenko-dev commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444380726 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time
[GitHub] [beam] lostluck merged pull request #12045: [BEAM-9615] Generate Go Schema Protos
lostluck merged pull request #12045: URL: https://github.com/apache/beam/pull/12045 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ZijieSong946 commented on a change in pull request #12054: [Work in progress] TIME type
ZijieSong946 commented on a change in pull request #12054: URL: https://github.com/apache/beam/pull/12054#discussion_r444382490 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java ## @@ -315,7 +315,7 @@ private static Expression castOutput(Expression value, FieldType toType) { private static Expression castOutputTime(Expression value, FieldType toType) { Expression valueDateTime = value; -// First, convert to millis (except for DATE type) +// First, convert to millis (except for DATE/TIME type) Review comment: Got it. Thanks for figuring it out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12022: [BEAM-10135] Add Jdbc Write external transform
piotr-szuberski commented on pull request #12022: URL: https://github.com/apache/beam/pull/12022#issuecomment-648314418 @TheNeuralBit I've made the requested changes. If you have further suggestions, go ahead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski edited a comment on pull request #12022: [BEAM-10135] Add Jdbc Write external transform
piotr-szuberski edited a comment on pull request #12022: URL: https://github.com/apache/beam/pull/12022#issuecomment-648314418 @TheNeuralBit If you have some suggestions, I ping you :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski commented on pull request #12023: URL: https://github.com/apache/beam/pull/12023#issuecomment-648316956 @TheNeuralBit I've made the requested changes. If you have further suggestions - go ahead :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #12059: Adding a helper function for making custom coders.
lostluck commented on pull request #12059: URL: https://github.com/apache/beam/pull/12059#issuecomment-648317098 1. Not while I'm in the middle of modifying all the coder code for Schemas :D 2. As you've described, there's a reason for the two packages, one is for Beam Model coders, and the other is for *not* beam model coders. That signal is valuable though I agree it's a bit subtle, and not called out very well. It's the kind of thing that I'd have called out in a document of a walkthrough of the Go SDK, but didn't in my talk. The value is that it's harder to accidentally use a non-standard coder when one must use a standard one. By being separate, they also serve as an example of how to write some of these. I do agree that the runtime folder is probably not the right place for the coderx package though. It's also a bit moot since it's not a package users will interact with. If we were to get rid of the coderx package, I'd rather the coders move to the main beam package or similar, since they're invoked in inferCoders IIRC? These are good things to think about, and the whole SDK could use a clear overhaul, but we should have a total plan for that so we can apply them evenly to all facets of the SDK at once, rather than one at a time. Note, that this would probably need to wait until a beam v3 or similar, as disrupting so many packages would break folks who are importing them for some reason out of the SDK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski edited a comment on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski edited a comment on pull request #12023: URL: https://github.com/apache/beam/pull/12023#issuecomment-648316956 @TheNeuralBit I've made the requested changes. If you have further suggestions - go ahead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'
udim commented on pull request #11570: URL: https://github.com/apache/beam/pull/11570#issuecomment-648324954 Is this ready to merge? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on pull request #12057: Use bz2 to compress pickler output for better compression.
aaltay commented on pull request #12057: URL: https://github.com/apache/beam/pull/12057#issuecomment-648328051 Note: Do not merge before 2.23 cut date. It would be good to merge for 2.24 and have the full 6 week cycle to see if benchmarks are affected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r34591 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -295,21 +301,32 @@ /** * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka configuration * should set with {@link Read#withBootstrapServers(String)} and {@link Read#withTopics(List)}. - * Other optional settings include key and value {@link Deserializer}s, custom timestamp and + * Other optional settings include key and value {@link Deserializer}s, custom timestamp, * watermark functions. */ public static Read read() { return new AutoValue_KafkaIO_Read.Builder() .setTopics(new ArrayList<>()) .setTopicPartitions(new ArrayList<>()) -.setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) -.setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) +.setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) +.setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) .setCommitOffsetsInFinalizeEnabled(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) .build(); } + /** + * Creates an uninitialized {@link ReadViaSDF} {@link PTransform}. Different from {@link Read}, + * setting up {@code topics} and {@code bootstrapServers} is not required during construction + * time. But the {@code bootstrapServers} still can be configured {@link + * ReadViaSDF#withBootstrapServers(String)}. Please refer to {@link ReadViaSDF} for more details. + */ + public static > + ReadViaSDF readAll() { +return ReadViaSDF.read(); Review comment: The `WatermarkEstimatorT` is used when defining `createWatermarkEstimatorFn` and when `@NewWatermarkEstimator` is called. I understand that we can always use `WatermarkEstimator` as the type, I thought it would be better to make the type explicitly, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] bamnet commented on a change in pull request #12061: [BEAM-4634] Add options to allow BigQuery StandardSQL queries.
bamnet commented on a change in pull request #12061: URL: https://github.com/apache/beam/pull/12061#discussion_r81248 ## File path: sdks/go/pkg/beam/io/bigqueryio/bigquery.go ## @@ -90,21 +90,27 @@ func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection // TODO(herohde) 7/13/2017: using * is probably too inefficient. We could infer // a focused query from the type. - return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t) + return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t, nil) +} + +// QueryOptions represents additional options for executing a query. +type QueryOptions struct { Review comment: Done. I agree, this is a cleaner design. I couldn't think of the phrase "variadic list of functional options" to save my life. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #11639: [BEAM-4440] Throw exception when file to stage is not found, instead of logging a warning
kennknowles commented on pull request #11639: URL: https://github.com/apache/beam/pull/11639#issuecomment-648400772 Ah there's also a hidden type error in my code. The rabbitmq test hung and timed out with no logs available unfortunately. Hard to file a meaningful bug there. Not sure if it was really that test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #12060: [BEAM-10218] Add FnApiRunner to cross-language validate runner test
chamikaramj commented on pull request #12060: URL: https://github.com/apache/beam/pull/12060#issuecomment-648401612 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #12002: [BEAM-9872] Moved Spark validates tests to shared file
ibzib commented on pull request #12002: URL: https://github.com/apache/beam/pull/12002#issuecomment-648405326 Run Python Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem merged pull request #12004: Enabling custom retry strategy for BQ streaming inserts.
pabloem merged pull request #12004: URL: https://github.com/apache/beam/pull/12004 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damondouglas opened a new pull request #12066: [BEAM-9679] Add Side Output to Core Transform Go SDK katas
damondouglas opened a new pull request #12066: URL: https://github.com/apache/beam/pull/12066 This pull requests adds a Side Output lesson to the Go SDK katas. I would like to request the following reviewers: (R: @lostluck ) (R: @henryken ) If accepted by both reviewers, please wait until the [Stepik course](https://stepik.org/course/70387) is updated before finally merging this PR. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://
[GitHub] [beam] ibzib commented on pull request #12002: [BEAM-9872] Moved Spark validates tests to shared file
ibzib commented on pull request #12002: URL: https://github.com/apache/beam/pull/12002#issuecomment-648443422 run seed job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #12002: [BEAM-9872] Moved Spark validates tests to shared file
ibzib commented on pull request #12002: URL: https://github.com/apache/beam/pull/12002#issuecomment-648445565 Run Python Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit opened a new pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances
TheNeuralBit opened a new pull request #12067: URL: https://github.com/apache/beam/pull/12067 This PR creates a global cache for component ID assignments made in PipelineContext instances. See BEAM-10308 and BEAM-10143 for motivation. Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python
[GitHub] [beam] youngoli commented on pull request #12059: Adding a helper function for making custom coders.
youngoli commented on pull request #12059: URL: https://github.com/apache/beam/pull/12059#issuecomment-648449493 > If we were to get rid of the coderx package, I'd rather the coders move to the main beam package or similar, since they're invoked in inferCoders IIRC? My only objection is that we'd likely want to use those coders in packages further down in the SDK, but importing `beam` from those is a common cause of import cycles from my experience, so I'd prefer putting it in a subpackage instead (and we could always forward it to `beam` if we really want). I think having something like `graph/coder/custom` would be better. It's consolidated in the same directory tree as the builtin coders, but clearly separate since its in a subdirectory, and the package name is still clear (ex. `custom.NewIntCoder()`). Agreed with everything else, including waiting until a major version update before going through with this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli merged pull request #12059: Adding a helper function for making custom coders.
youngoli merged pull request #12059: URL: https://github.com/apache/beam/pull/12059 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances
TheNeuralBit commented on pull request #12067: URL: https://github.com/apache/beam/pull/12067#issuecomment-648451444 R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances
TheNeuralBit commented on pull request #12067: URL: https://github.com/apache/beam/pull/12067#issuecomment-648451338 One shortcoming of this approach is that component ids are stored in a _global_ cache. I think it would be better if the cache could be scoped to a pipeline, but that would be more difficult to implement. I don't _think_ the global cache would cause any issues except for an extreme case where thousands of pipelines are being launched from a single python process and we consume a huge amount of memory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavan-google-intern commented on a change in pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
saavan-google-intern commented on a change in pull request #12009: URL: https://github.com/apache/beam/pull/12009#discussion_r444541219 ## File path: sdks/python/apache_beam/pvalue.py ## @@ -158,6 +159,20 @@ def __ne__(self, other): def __hash__(self): return hash((self.tag, self.producer)) + class PCollectionTypeConstraint(SequenceTypeConstraint): Review comment: This PR uses the built-in Generic[T] instead of a new TypeConstraint so this is no longer relevant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavan-google-intern commented on a change in pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
saavan-google-intern commented on a change in pull request #12009: URL: https://github.com/apache/beam/pull/12009#discussion_r444541691 ## File path: sdks/python/apache_beam/pvalue.py ## @@ -425,8 +441,7 @@ def _from_runtime_iterable(it, options): def _view_options(self): return { -'data': self._data, -# For non-fn-api runners. +'data': self._data, # For non-fn-api runners. Review comment: Some of the YAPF changes are fixed but a few whitespace changes remain that I'll look into soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavan-google-intern commented on pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
saavan-google-intern commented on pull request #12009: URL: https://github.com/apache/beam/pull/12009#issuecomment-648460175 pushed tests can we test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11911: [BEAM-10186] Sends an empty response to the runner instead of failing
lukecwik commented on a change in pull request #11911: URL: https://github.com/apache/beam/pull/11911#discussion_r444541845 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ## @@ -260,6 +260,46 @@ BundleProcessor get( } } + @Test + public void testTrySplitBeforeBundleDoesNotFail() { +ProcessBundleHandler handler = +new ProcessBundleHandler( +PipelineOptionsFactory.create(), +null, +beamFnDataClient, +null /* beamFnStateClient */, +null /* finalizeBundleHandler */, +ImmutableMap.of(), +new BundleProcessorCache()); + +handler.trySplit( +BeamFnApi.InstructionRequest.newBuilder() +.setInstructionId("999L") +.setProcessBundleSplit( + BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId("unknown-id")) +.build()); + } + + @Test + public void testProgressBeforeBundleDoesNotFail() throws Exception { +ProcessBundleHandler handler = +new ProcessBundleHandler( +PipelineOptionsFactory.create(), +null, +beamFnDataClient, +null /* beamFnStateClient */, +null /* finalizeBundleHandler */, +ImmutableMap.of(), +new BundleProcessorCache()); + +handler.progress( Review comment: validate the contents of the response ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ## @@ -260,6 +260,46 @@ BundleProcessor get( } } + @Test + public void testTrySplitBeforeBundleDoesNotFail() { +ProcessBundleHandler handler = +new ProcessBundleHandler( +PipelineOptionsFactory.create(), +null, +beamFnDataClient, +null /* beamFnStateClient */, +null /* finalizeBundleHandler */, +ImmutableMap.of(), +new BundleProcessorCache()); + +handler.trySplit( Review comment: validate the contents of the response This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] saavan-google-intern edited a comment on pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
saavan-google-intern edited a comment on pull request #12009: URL: https://github.com/apache/beam/pull/12009#issuecomment-643904266 R: @udim CC: @robertwb This PR is not ready for merge, but I'm making it available so we can track and discuss its progress. What do you think of the current implementation? It works on some trivial pipelines that I've run on my machine but I want to confirm the solution's design before I debug deeper and write real tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11911: [BEAM-10186] Sends an empty response to the runner instead of failing
lukecwik commented on a change in pull request #11911: URL: https://github.com/apache/beam/pull/11911#discussion_r444542853 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ## @@ -341,53 +341,52 @@ private void createRunnerAndConsumersForPTransformRecursively( throws Exception { BundleProcessor bundleProcessor = bundleProcessorCache.find(request.getProcessBundleProgress().getInstructionId()); -if (bundleProcessor == null) { Review comment: nit, you could remove the level of nesting by using a guard statement (here and below): ``` if (bundleProcessor == null) { return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance()); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality
amaliujia commented on a change in pull request #11975: URL: https://github.com/apache/beam/pull/11975#discussion_r444547096 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsExperimentTest.java ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Ignore; +import org.junit.Test; + +/** + * A simple Analytic Functions experiment for BeamSQL created in order to understand the query + * processing workflow of BeamSQL and Calcite. + */ +public class BeamAnalyticFunctionsExperimentTest extends BeamSqlDslBase { + + /** + * Table schema and data taken from + * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#produce_table + * + * Compute a cumulative sum query taken from + * https://cloud.google.com/bigquery/docs/reference/standard-sql/analytic-function-concepts#compute_a_cumulative_sum + */ + @Test + public void testOverCumulativeSum() throws Exception { +pipeline.enableAbandonedNodeEnforcement(false); +Schema schema = +Schema.builder() +.addStringField("item") +.addStringField("category") +.addInt32Field("purchases") +.build(); +PCollection inputRows = +pipeline +.apply( +Create.of( +TestUtils.rowsBuilderOf(schema) +.addRows( +"kale", +"vegetable", +23, +"orange", +"fruit", +2, +"cabbage", +"vegetable", +9, +"apple", +"fruit", +8, +"leek", +"vegetable", +2, +"lettuce", +"vegetable", +10) +.getRows())) +.setRowSchema(schema); +String sql = +"SELECT item, purchases, category, sum(purchases) over " ++ "(PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)" ++ " as total_purchases FROM PCOLLECTION"; Review comment: Thanks. Support PartitionBy and OrderBy is huge! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12061: [BEAM-4634] Add options to allow BigQuery StandardSQL queries.
lostluck commented on a change in pull request #12061: URL: https://github.com/apache/beam/pull/12061#discussion_r444548615 ## File path: sdks/go/pkg/beam/io/bigqueryio/bigquery.go ## @@ -93,18 +93,37 @@ func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t) } +// QueryOptions represents additional options for executing a query. +type QueryOptions struct { + // UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. + UseStandardSQL bool +} + +// UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. +func UseStandardSQL(qo *QueryOptions) error { + qo.UseStandardSQL = true + return nil +} Review comment: Part of the functional options is to have the user level functions return functions. This allows for closures over user configuration for more complicated options later on. Probably doesn't matter in this case, but otherwise: eg. ```suggestion func UseStandardSQL() func(qo *QueryOptions) error { return func(qo *QueryOptions) error qo.UseStandardSQL = true return nil } } ``` I'd probably drop the "use" so it's just StandardSQL. When there are more of these, the repeated Use prefix will get tiresome and reduce readability. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #11975: [BEAM-9198] BeamSQL aggregation analytics functionality
amaliujia commented on a change in pull request #11975: URL: https://github.com/apache/beam/pull/11975#discussion_r444548154 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.AggregateCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Window; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLiteral; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; + +public class BeamWindowRel extends Window implements BeamRelNode { + public BeamWindowRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode input, + List constants, + RelDataType rowType, + List groups) { +super(cluster, traitSet, input, constants, rowType, groups); + } + + @Override + public PTransform, PCollection> buildPTransform() { +Schema outputSchema = CalciteUtils.toSchema(getRowType()); +final List analyticFields = Lists.newArrayList(); +this.groups.stream() +.forEach( +anAnalyticGroup -> { + List partitionKeysDef = anAnalyticGroup.keys.toList(); + List orderByKeys = Lists.newArrayList(); + List orderByDirections = Lists.newArrayList(); + List orderByNullDirections = Lists.newArrayList(); Review comment: Add a test or add a checkArgument to disable (either one works for me) for NULL last or NULL first in ORDER BY. For NULL handling, depends on you, you can leave it for future PRs. ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.ex
[GitHub] [beam] ihji commented on pull request #12060: [BEAM-10218] Add FnApiRunner to cross-language validate runner test
ihji commented on pull request #12060: URL: https://github.com/apache/beam/pull/12060#issuecomment-648474919 R: @pabloem Add Pablo in the loop since Robert is on vacation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444564026 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ## @@ -0,0 +1,861 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg; +import org.apache.beam.sdk.io.kafka.KafkaSourceDescription.Schemas; +import org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link KafkaSourceDescription} as input and + * outputs a PCollection of {@link KafkaRecord}. The core implementation is based on {@code + * SplittableDoFn}. For more details about the concept of {@code SplittableDoFn}, please refer to + * the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Inst
[GitHub] [beam] bamnet commented on a change in pull request #12061: [BEAM-4634] Add options to allow BigQuery StandardSQL queries.
bamnet commented on a change in pull request #12061: URL: https://github.com/apache/beam/pull/12061#discussion_r444565916 ## File path: sdks/go/pkg/beam/io/bigqueryio/bigquery.go ## @@ -93,18 +93,37 @@ func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t) } +// QueryOptions represents additional options for executing a query. +type QueryOptions struct { + // UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. + UseStandardSQL bool +} + +// UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. +func UseStandardSQL(qo *QueryOptions) error { + qo.UseStandardSQL = true + return nil +} Review comment: Done. Swapped to returning a function. I left the `Use` prefix to match the the Python and Java BigQuery IO packages and base BQ library. Other config options, like flatten results and validation, don't follow this pattern so I expect this to be a one-off (feels like legacy decision from the BigQuery API design). Happy to change tho, LMK! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib merged pull request #12002: [BEAM-9872] Moved Spark validates tests to shared file
ibzib merged pull request #12002: URL: https://github.com/apache/beam/pull/12002 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
udim commented on pull request #12009: URL: https://github.com/apache/beam/pull/12009#issuecomment-648498242 Let's see This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #12061: [BEAM-4634] Add options to allow BigQuery StandardSQL queries.
lostluck commented on a change in pull request #12061: URL: https://github.com/apache/beam/pull/12061#discussion_r444579505 ## File path: sdks/go/pkg/beam/io/bigqueryio/bigquery.go ## @@ -93,18 +93,37 @@ func Read(s beam.Scope, project, table string, t reflect.Type) beam.PCollection return query(s, project, fmt.Sprintf("SELECT * from [%v]", table), t) } +// QueryOptions represents additional options for executing a query. +type QueryOptions struct { + // UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. + UseStandardSQL bool +} + +// UseStandardSQL enables BigQuery's Standard SQL dialect when executing a query. +func UseStandardSQL(qo *QueryOptions) error { + qo.UseStandardSQL = true + return nil +} Review comment: Works for me! Consistency with the deeper BigQuery options certainly trumps local verbosity. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck merged pull request #12061: [BEAM-4634] Add options to allow BigQuery StandardSQL queries.
lostluck merged pull request #12061: URL: https://github.com/apache/beam/pull/12061 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #12061: [BEAM-4634] Add options to allow BigQuery StandardSQL queries.
lostluck commented on pull request #12061: URL: https://github.com/apache/beam/pull/12061#issuecomment-648510341 Congratulations! You're now an Apache Beam contributor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #12011: [BEAM-9269] Set Spanner commit deadline using GRPC api not interceptor.
chamikaramj commented on pull request #12011: URL: https://github.com/apache/beam/pull/12011#issuecomment-648514485 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #12010: [BEAM-10259] Use ref-counted connection to Spanner to prevent multiple connections.
chamikaramj commented on pull request #12010: URL: https://github.com/apache/beam/pull/12010#issuecomment-648516228 Retest this please I'll wait for Allen's review here. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia opened a new pull request #12068: [Proof of Concept] Support SQL UDTVF
amaliujia opened a new pull request #12068: URL: https://github.com/apache/beam/pull/12068 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/b
[GitHub] [beam] amaliujia commented on pull request #12068: [Do Not Merge][Proof of Concept] Support SQL UDTVF
amaliujia commented on pull request #12068: URL: https://github.com/apache/beam/pull/12068#issuecomment-648521028 CC: @ibzib This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidyan74 commented on pull request #12047: [BEAM-10291] Adding full thread dump upon lull detection
davidyan74 commented on pull request #12047: URL: https://github.com/apache/beam/pull/12047#issuecomment-648522058 The original lull logging code does not have test coverage. I'll look into adding tests that cover both the original code and the added code in this PR. It will take me a while though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] davidyan74 commented on pull request #12047: [BEAM-10291] Adding full thread dump upon lull detection
davidyan74 commented on pull request #12047: URL: https://github.com/apache/beam/pull/12047#issuecomment-648522264 The tests have been pending for a long time now. Is there anything wrong with it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #12047: [BEAM-10291] Adding full thread dump upon lull detection
tvalentyn commented on pull request #12047: URL: https://github.com/apache/beam/pull/12047#issuecomment-648526151 Sometimes Jenkins has glitches... not sure what is wrong in this case. Thank you, tests would be helpful since postcommit tests will probably not exercise this codepath, since it requires a lull to happen. So if somebody adds a typo in that code, it may go undetected until we have to debug a slow customer job. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org