[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172553 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 06/Dec/18 01:30 Start Date: 06/Dec/18 01:30 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239301167 ## File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java ## @@ -423,6 +434,187 @@ public void close() {} contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue))); } + @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10) + public void testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent() + throws Exception { +final String stepName = "fakeStepNameWithUserMetrics"; +final String namespace = "sdk/whatever"; +final String name = "someCounter"; +final int counterValue = 42; +final int finalCounterValue = 77; +final CountDownLatch progressSentLatch = new CountDownLatch(1); +final CountDownLatch processBundleLatch = new CountDownLatch(1); + +final BeamFnApi.Metrics.User.MetricName metricName = +BeamFnApi.Metrics.User.MetricName.newBuilder() +.setNamespace(namespace) +.setName(name) +.build(); + +final BeamFnApi.Metrics deprecatedMetrics = +BeamFnApi.Metrics.newBuilder() +.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS) +.putPtransforms( +stepName, +BeamFnApi.Metrics.PTransform.newBuilder() +.addUser( +BeamFnApi.Metrics.User.newBuilder() +.setMetricName(metricName) +.setCounterData( +BeamFnApi.Metrics.User.CounterData.newBuilder() +.setValue(finalCounterValue))) +.build()) +.build(); + +final int expectedCounterValue = 5; +final BeamFnApi.MonitoringInfo expectedMonitoringInfo = +BeamFnApi.MonitoringInfo.newBuilder() +.setUrn("beam:metric:user:ExpectedCounter") +.setType("beam:metrics:sum_int_64") +.putLabels("PTRANSFORM", "ExpectedPTransform") +.setMetric( +BeamFnApi.Metric.newBuilder() +.setCounterData( +BeamFnApi.CounterData.newBuilder() +.setInt64Value(expectedCounterValue) +.build()) +.build()) +.build(); + +InstructionRequestHandler instructionRequestHandler = +new InstructionRequestHandler() { + @Override + public CompletionStage handle(InstructionRequest request) { +switch (request.getRequestCase()) { + case REGISTER: +return CompletableFuture.completedFuture(responseFor(request).build()); + case PROCESS_BUNDLE: +return MoreFutures.supplyAsync( +() -> { + processBundleLatch.await(); + return responseFor(request) + .setProcessBundle( + BeamFnApi.ProcessBundleResponse.newBuilder() + .setMetrics(deprecatedMetrics) + .addMonitoringInfos(expectedMonitoringInfo)) + .build(); +}); + case PROCESS_BUNDLE_PROGRESS: +progressSentLatch.countDown(); +return CompletableFuture.completedFuture( +responseFor(request) +.setProcessBundleProgress( + BeamFnApi.ProcessBundleProgressResponse.newBuilder() +.setMetrics(deprecatedMetrics) +.addMonitoringInfos(expectedMonitoringInfo)) +.build()); + default: +throw new RuntimeException("Reached unexpected code path"); +} + } + + @Override + public void close() {} +}; + +Map stepContextMap = new HashMap<>(); +NameContext nc = +new NameContext() { + @Nullable + @Override + public String stageName() { +return "ExpectedStage"; + } + + @Nullable + @Override + public String originalName() { +return
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172552=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172552 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 06/Dec/18 01:30 Start Date: 06/Dec/18 01:30 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239301128 ## File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java ## @@ -423,6 +434,187 @@ public void close() {} contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue))); } + @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10) + public void testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent() + throws Exception { +final String stepName = "fakeStepNameWithUserMetrics"; +final String namespace = "sdk/whatever"; +final String name = "someCounter"; +final int counterValue = 42; +final int finalCounterValue = 77; +final CountDownLatch progressSentLatch = new CountDownLatch(1); +final CountDownLatch processBundleLatch = new CountDownLatch(1); + +final BeamFnApi.Metrics.User.MetricName metricName = +BeamFnApi.Metrics.User.MetricName.newBuilder() +.setNamespace(namespace) +.setName(name) +.build(); + +final BeamFnApi.Metrics deprecatedMetrics = +BeamFnApi.Metrics.newBuilder() +.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS) +.putPtransforms( +stepName, +BeamFnApi.Metrics.PTransform.newBuilder() +.addUser( +BeamFnApi.Metrics.User.newBuilder() +.setMetricName(metricName) +.setCounterData( +BeamFnApi.Metrics.User.CounterData.newBuilder() +.setValue(finalCounterValue))) +.build()) +.build(); + +final int expectedCounterValue = 5; +final BeamFnApi.MonitoringInfo expectedMonitoringInfo = +BeamFnApi.MonitoringInfo.newBuilder() +.setUrn("beam:metric:user:ExpectedCounter") +.setType("beam:metrics:sum_int_64") +.putLabels("PTRANSFORM", "ExpectedPTransform") +.setMetric( +BeamFnApi.Metric.newBuilder() +.setCounterData( +BeamFnApi.CounterData.newBuilder() +.setInt64Value(expectedCounterValue) +.build()) +.build()) +.build(); + +InstructionRequestHandler instructionRequestHandler = +new InstructionRequestHandler() { + @Override + public CompletionStage handle(InstructionRequest request) { +switch (request.getRequestCase()) { + case REGISTER: +return CompletableFuture.completedFuture(responseFor(request).build()); + case PROCESS_BUNDLE: +return MoreFutures.supplyAsync( +() -> { + processBundleLatch.await(); + return responseFor(request) + .setProcessBundle( + BeamFnApi.ProcessBundleResponse.newBuilder() + .setMetrics(deprecatedMetrics) + .addMonitoringInfos(expectedMonitoringInfo)) + .build(); +}); + case PROCESS_BUNDLE_PROGRESS: +progressSentLatch.countDown(); +return CompletableFuture.completedFuture( +responseFor(request) +.setProcessBundleProgress( + BeamFnApi.ProcessBundleProgressResponse.newBuilder() +.setMetrics(deprecatedMetrics) +.addMonitoringInfos(expectedMonitoringInfo)) +.build()); + default: +throw new RuntimeException("Reached unexpected code path"); +} + } + + @Override + public void close() {} +}; + +Map stepContextMap = new HashMap<>(); +NameContext nc = +new NameContext() { Review comment: Do you need to override this and make a new class? Or is it possible to just instantiate an existing class and passing in these strings? This is an
[jira] [Work logged] (BEAM-3836) Java SDK harness should understand a BundleSplitRequest and respond with a BundleSplit before bundle finishes
[ https://issues.apache.org/jira/browse/BEAM-3836?focusedWorklogId=172543=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172543 ] ASF GitHub Bot logged work on BEAM-3836: Author: ASF GitHub Bot Created on: 06/Dec/18 00:45 Start Date: 06/Dec/18 00:45 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #7200: [BEAM-3836] Add support for checkpointing to the Java SDK harness. This is towards a general splitting solution over the Fn API. URL: https://github.com/apache/beam/pull/7200#issuecomment-444706319 R: @robertwb @iemejia @swegner CC: @boyuanzz This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172543) Time Spent: 10m Remaining Estimate: 0h > Java SDK harness should understand a BundleSplitRequest and respond with a > BundleSplit before bundle finishes > - > > Key: BEAM-3836 > URL: https://issues.apache.org/jira/browse/BEAM-3836 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-harness >Reporter: Eugene Kirpichov >Assignee: Luke Cwik >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > This, like BEAM-3835, is more necessary for LS than for checkpointing, but is > technically also part of the SDF splitting protocol. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-3836) Java SDK harness should understand a BundleSplitRequest and respond with a BundleSplit before bundle finishes
[ https://issues.apache.org/jira/browse/BEAM-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Cwik reassigned BEAM-3836: --- Assignee: Luke Cwik (was: Eugene Kirpichov) > Java SDK harness should understand a BundleSplitRequest and respond with a > BundleSplit before bundle finishes > - > > Key: BEAM-3836 > URL: https://issues.apache.org/jira/browse/BEAM-3836 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-harness >Reporter: Eugene Kirpichov >Assignee: Luke Cwik >Priority: Major > > This, like BEAM-3835, is more necessary for LS than for checkpointing, but is > technically also part of the SDF splitting protocol. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5321) Finish Python 3 porting for transforms module
[ https://issues.apache.org/jira/browse/BEAM-5321?focusedWorklogId=172538=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172538 ] ASF GitHub Bot logged work on BEAM-5321: Author: ASF GitHub Bot Created on: 06/Dec/18 00:12 Start Date: 06/Dec/18 00:12 Worklog Time Spent: 10m Work Description: RobbeSneyders commented on a change in pull request #7104: [BEAM-5321] Port transforms package to Python 3 URL: https://github.com/apache/beam/pull/7104#discussion_r239287015 ## File path: sdks/python/setup.py ## @@ -102,16 +102,17 @@ def get_version(): cythonize = lambda *args, **kwargs: [] REQUIRED_PACKAGES_PY2_ONLY = [ -'avro>=1.8.1,<2.0.0' +'avro>=1.8.1,<2.0.0', +'dill>=0.2.6,<=0.2.8.2', ] REQUIRED_PACKAGES_PY3_ONLY = [ -'avro-python3>=1.8.1,<2.0.0' +'avro-python3>=1.8.1,<2.0.0', Review comment: This doesn't seem to work for the dill requirement. We can clean this up when we clean up the dill dependency. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172538) Time Spent: 2.5h (was: 2h 20m) > Finish Python 3 porting for transforms module > - > > Key: BEAM-5321 > URL: https://issues.apache.org/jira/browse/BEAM-5321 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: Robbe >Priority: Major > Fix For: Not applicable > > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172530=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172530 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 06/Dec/18 00:03 Start Date: 06/Dec/18 00:03 Worklog Time Spent: 10m Work Description: Ardagan commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239285290 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -307,10 +344,17 @@ void updateProgress() { grpcWriteOperation.abortWait(); } -BeamFnApi.Metrics metrics = MoreFutures.get(bundleProcessOperation.getMetrics()); +//TODO: Replace getProcessBundleProgress with getMonitoringInfos when Metrics is deprecated. Review comment: I was thinking on the proper way here and assumed that todo that explains when the code gets deprecated was the best approach. 1. Creating JIRA that looks like "Cleanup Dataflow Runner Harness" when Metrics are deprecated. Doesn't cover proper workload. So it is not the best idea in my opinion. 2. Assigning username is not beneficial as well, since we do not know when will we completely deprecate Metrics. So it doesn't really differ from regular todo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172530) Time Spent: 3h 20m (was: 3h 10m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172529=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172529 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 06/Dec/18 00:02 Start Date: 06/Dec/18 00:02 Worklog Time Spent: 10m Work Description: Ardagan commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239285290 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -307,10 +344,17 @@ void updateProgress() { grpcWriteOperation.abortWait(); } -BeamFnApi.Metrics metrics = MoreFutures.get(bundleProcessOperation.getMetrics()); +//TODO: Replace getProcessBundleProgress with getMonitoringInfos when Metrics is deprecated. Review comment: I was thinking on the proper way here and assumed that todo that clears up when the code gets deprecated was the best approach. 1. Creating JIRA that looks like "Cleanup Dataflow Runner Harness" when Metrics are deprecated. Doesn't cover proper workload. So it is not the best idea in my opinion. 2. Assigning username is not beneficial as well, since we do not know when will we completely deprecate Metrics. So it doesn't really differ from regular todo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172529) Time Spent: 3h 10m (was: 3h) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5321) Finish Python 3 porting for transforms module
[ https://issues.apache.org/jira/browse/BEAM-5321?focusedWorklogId=172511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172511 ] ASF GitHub Bot logged work on BEAM-5321: Author: ASF GitHub Bot Created on: 05/Dec/18 22:54 Start Date: 05/Dec/18 22:54 Worklog Time Spent: 10m Work Description: RobbeSneyders commented on issue #7104: [BEAM-5321] Port transforms package to Python 3 URL: https://github.com/apache/beam/pull/7104#issuecomment-444682962 Seems like adding `--process-dependency-links` to the tox `install_command` worked. Although I don't know why this is only necessary for Jenkins. https://github.com/apache/beam/blob/01b75994709b1964f2c555d542b19ecf20f223ec/sdks/python/tox.ini#L44 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172511) Time Spent: 2h 20m (was: 2h 10m) > Finish Python 3 porting for transforms module > - > > Key: BEAM-5321 > URL: https://issues.apache.org/jira/browse/BEAM-5321 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: Robbe >Priority: Major > Fix For: Not applicable > > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172510=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172510 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:52 Start Date: 05/Dec/18 22:52 Worklog Time Spent: 10m Work Description: Ardagan commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239268924 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +398,106 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// Keeping this as static class for this iteration. Will extract to separate file and generalize +// when more counter types are added. +// todomigryz: define counter transformer factory +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + if (!type.equals("beam:metrics:sum_int_64")) { +return null; + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + if (ptransform == null) { +return null; + } + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = + monitoringInfo + .getUrn() + .substring(BEAM_METRICS_USER_PREFIX.length()) + .replace("^:", ""); + + final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); + String counterName = nameWithNamespace.substring(lastColonIndex + 1); + String counterNamespace = nameWithNamespace.substring(0, lastColonIndex); + + name.setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + // Workaround for bug in python sdk that missed colon after ...metric:user. + .setName(counterName) + .setOriginalStepName( + stepContext == null ? null : stepContext.getNameContext().originalName()) + .setExecutionStepName( + stepContext == null ? null : stepContext.getNameContext().systemName()) + .setOriginNamespace(counterNamespace)) + .setMetadata(new CounterMetadata().setKind("SUM")); + + return new CounterUpdate() Review comment: That is a valid point. But I'd prefer to implement this in next PR when I add other type of counters. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172510) Time Spent: 3h (was: 2h 50m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172502=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172502 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r238894508 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +395,104 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// todomigryz: define counter transformer factory (read wiki for proper name) +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + if (!type.equals("beam:metrics:sum_int_64")) { +return null; + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + if (ptransform == null) { +return null; + } + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = + monitoringInfo + .getUrn() + .substring(BEAM_METRICS_USER_PREFIX.length()) + .replace("^:", ""); + + final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); + String counterName = nameWithNamespace.substring(lastColonIndex + 1); + String counterNamespace = nameWithNamespace.substring(0, lastColonIndex); + + name.setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + // Workaround for bug in python sdk that missed colon after ...metric:user. + .setName(counterName) + .setOriginalStepName( + stepContext == null ? null : stepContext.getNameContext().originalName()) Review comment: I don't think we should ever set this to null, if we are unable to mape to a stepContext we should drop the metric and log error/warning This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172502) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172491=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172491 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r238894238 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +395,104 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// todomigryz: define counter transformer factory (read wiki for proper name) +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { Review comment: Architectural suggestion would be to use polymorphism instead of a switch statement, Make a class/object responsible for each URN. Though I think much of it can be common, like label extraction/conversion. Let's come up with something that will support some default behaviour and allow you to override it with specific URN needs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172491) Time Spent: 1.5h (was: 1h 20m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172498=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172498 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239254784 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +398,106 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// Keeping this as static class for this iteration. Will extract to separate file and generalize +// when more counter types are added. +// todomigryz: define counter transformer factory +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + if (!type.equals("beam:metrics:sum_int_64")) { +return null; + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + if (ptransform == null) { +return null; + } + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = + monitoringInfo + .getUrn() + .substring(BEAM_METRICS_USER_PREFIX.length()) + .replace("^:", ""); + + final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); + String counterName = nameWithNamespace.substring(lastColonIndex + 1); + String counterNamespace = nameWithNamespace.substring(0, lastColonIndex); + + name.setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + // Workaround for bug in python sdk that missed colon after ...metric:user. + .setName(counterName) + .setOriginalStepName( + stepContext == null ? null : stepContext.getNameContext().originalName()) + .setExecutionStepName( + stepContext == null ? null : stepContext.getNameContext().systemName()) + .setOriginNamespace(counterNamespace)) + .setMetadata(new CounterMetadata().setKind("SUM")); + + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(false) + .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value)); +} +return null; + } +} + +private void updateMetrics(List monitoringInfos) { Review comment: please add docstrings to all of these. There are many methods with too similar names. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172498) Time Spent: 2h 20m (was: 2h 10m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 >
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172496=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172496 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239247446 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -307,10 +347,17 @@ void updateProgress() { grpcWriteOperation.abortWait(); } -BeamFnApi.Metrics metrics = MoreFutures.get(bundleProcessOperation.getMetrics()); +//TODO: Replace getProcessBundleProgress with getMonitoringInfos when Metrics is deprecated. +ProcessBundleProgressResponse processBundleProgressResponse = +MoreFutures.get(bundleProcessOperation.getProcessBundleProgress()); +updateMetrics(processBundleProgressResponse.getMonitoringInfosList()); -updateMetrics(metrics); +// Supporting deprecated metrics until all supported runners are migrated to using +// MonitoringInfos +Metrics metrics = processBundleProgressResponse.getMetrics(); +updateMetricsDeprecated(metrics); +// TODO: change this to utilize MonitroingInfos Review comment: please add username and/or jira number This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172496) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172493=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172493 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239240671 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java ## @@ -263,20 +263,35 @@ private synchronized WorkItemStatus createStatusUpdate(boolean isFinal) { return status; } + // todo this method should return List instead of setting it to WorkitemStatus Review comment: Please add a username and/or jira This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172493) Time Spent: 1h 50m (was: 1h 40m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172504=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172504 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:07 Start Date: 05/Dec/18 22:07 Worklog Time Spent: 10m Work Description: Ardagan commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239256276 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +395,104 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// todomigryz: define counter transformer factory (read wiki for proper name) +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { Review comment: Agree with this one. I decided to keep it this way for now. Will implement polymorphism when adding support for another counter type. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172504) Time Spent: 2h 50m (was: 2h 40m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172499=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172499 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239253828 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +398,106 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// Keeping this as static class for this iteration. Will extract to separate file and generalize +// when more counter types are added. +// todomigryz: define counter transformer factory +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + if (!type.equals("beam:metrics:sum_int_64")) { +return null; + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + if (ptransform == null) { +return null; + } + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = Review comment: Please put this in a helper method for readability, make a method to parse the string This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172499) Time Spent: 2h 20m (was: 2h 10m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172490=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172490 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r238893122 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -307,10 +344,17 @@ void updateProgress() { grpcWriteOperation.abortWait(); } -BeamFnApi.Metrics metrics = MoreFutures.get(bundleProcessOperation.getMetrics()); +//TODO: Replace getProcessBundleProgress with getMonitoringInfos when Metrics is deprecated. Review comment: please add username and/or jira number This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172490) Time Spent: 1h 20m (was: 1h 10m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172494=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172494 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239241898 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java ## @@ -263,20 +263,35 @@ private synchronized WorkItemStatus createStatusUpdate(boolean isFinal) { return status; } + // todo this method should return List instead of setting it to WorkitemStatus @VisibleForTesting synchronized void populateCounterUpdates(WorkItemStatus status) { if (worker == null) { return; } boolean isFinalUpdate = Boolean.TRUE.equals(status.getCompleted()); -ImmutableList.Builder counterUpdatesBuilder = ImmutableList.builder(); -counterUpdatesBuilder.addAll(extractCounters(worker.getOutputCounters())); -counterUpdatesBuilder.addAll(extractMetrics(isFinalUpdate)); -counterUpdatesBuilder.addAll(extractMsecCounters(isFinalUpdate)); -counterUpdatesBuilder.addAll(worker.extractMetricUpdates()); -ImmutableList counterUpdates = counterUpdatesBuilder.build(); +ImmutableList.Builder counterUpdatesListBuilder = ImmutableList.builder(); +Iterable newCounterUpdates; + +// Output counters +newCounterUpdates = extractCounters(worker.getOutputCounters()); +counterUpdatesListBuilder.addAll(newCounterUpdates); + +// User metrics reported in Worker +newCounterUpdates = extractMetrics(isFinalUpdate); Review comment: Please revert to the previous style, there isn't much value to expanding these to two lines each. thanks for the code comments though This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172494) Time Spent: 2h (was: 1h 50m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172503=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172503 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239245981 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -307,10 +347,17 @@ void updateProgress() { grpcWriteOperation.abortWait(); } -BeamFnApi.Metrics metrics = MoreFutures.get(bundleProcessOperation.getMetrics()); +//TODO: Replace getProcessBundleProgress with getMonitoringInfos when Metrics is deprecated. +ProcessBundleProgressResponse processBundleProgressResponse = +MoreFutures.get(bundleProcessOperation.getProcessBundleProgress()); +updateMetrics(processBundleProgressResponse.getMonitoringInfosList()); -updateMetrics(metrics); +// Supporting deprecated metrics until all supported runners are migrated to using +// MonitoringInfos +Metrics metrics = processBundleProgressResponse.getMetrics(); +updateMetricsDeprecated(metrics); +// TODO: change this to utilize MonitroingInfos Review comment: sp: MonitroingInfos This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172503) Time Spent: 2h 40m (was: 2.5h) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172497=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172497 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239252005 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +398,106 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// Keeping this as static class for this iteration. Will extract to separate file and generalize +// when more counter types are added. +// todomigryz: define counter transformer factory +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. Review comment: Add username and/or jira number This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172497) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172489=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172489 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r238892994 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -177,21 +199,24 @@ public ReadOperation getReadOperation() throws Exception { throw new IllegalStateException(String.format("ReadOperation not found in %s", operations)); } - private static interface ProgressTracker { + private interface ProgressTracker { @Nullable -public NativeReader.Progress getWorkerProgress() throws Exception; +public Progress getWorkerProgress() throws Exception; /** * Returns an metric updates accumulated since the last call to {@link #extractMetricUpdates()}. */ +@Deprecated public MetricUpdates extractMetricUpdates(); +public List extractCounterUpdates(); Review comment: please add a comment describing what these are This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172489) Time Spent: 1h 10m (was: 1h) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172492=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172492 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239254570 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +398,106 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// Keeping this as static class for this iteration. Will extract to separate file and generalize +// when more counter types are added. +// todomigryz: define counter transformer factory +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + if (!type.equals("beam:metrics:sum_int_64")) { +return null; + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + if (ptransform == null) { +return null; + } + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = + monitoringInfo + .getUrn() + .substring(BEAM_METRICS_USER_PREFIX.length()) + .replace("^:", ""); + + final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); + String counterName = nameWithNamespace.substring(lastColonIndex + 1); + String counterNamespace = nameWithNamespace.substring(0, lastColonIndex); + + name.setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + // Workaround for bug in python sdk that missed colon after ...metric:user. + .setName(counterName) + .setOriginalStepName( + stepContext == null ? null : stepContext.getNameContext().originalName()) + .setExecutionStepName( + stepContext == null ? null : stepContext.getNameContext().systemName()) + .setOriginNamespace(counterNamespace)) + .setMetadata(new CounterMetadata().setKind("SUM")); + + return new CounterUpdate() Review comment: This is okay for now, since you are just unpacking the user counter metric. but I want to see this share more code for different metrics later on. Any label extraction can be shared for example. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172492) Time Spent: 1h 40m (was: 1.5h) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172501=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172501 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239254686 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +398,106 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// Keeping this as static class for this iteration. Will extract to separate file and generalize +// when more counter types are added. +// todomigryz: define counter transformer factory +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + if (!type.equals("beam:metrics:sum_int_64")) { +return null; + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + if (ptransform == null) { +return null; + } + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = + monitoringInfo + .getUrn() + .substring(BEAM_METRICS_USER_PREFIX.length()) + .replace("^:", ""); + + final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); + String counterName = nameWithNamespace.substring(lastColonIndex + 1); + String counterNamespace = nameWithNamespace.substring(0, lastColonIndex); + + name.setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + // Workaround for bug in python sdk that missed colon after ...metric:user. + .setName(counterName) + .setOriginalStepName( + stepContext == null ? null : stepContext.getNameContext().originalName()) + .setExecutionStepName( + stepContext == null ? null : stepContext.getNameContext().systemName()) + .setOriginNamespace(counterNamespace)) + .setMetadata(new CounterMetadata().setKind("SUM")); + + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(false) + .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value)); +} +return null; + } +} + +private void updateMetrics(List monitoringInfos) { + final MonitoringInfoToCounterUpdateTransformer monitoringInfoToCounterUpdateTransformer = + new MonitoringInfoToCounterUpdateTransformer( + bundleProcessOperation.getPtransformIdToUserStepContext()); + String miDump = Review comment: rm debug code This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172501) Time Spent: 2.5h (was: 2h 20m) > Utilize MetricInfo for
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172495=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172495 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239254849 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java ## @@ -351,7 +398,106 @@ void updateProgress() { } } -private void updateMetrics(BeamFnApi.Metrics metrics) { +// Keeping this as static class for this iteration. Will extract to separate file and generalize +// when more counter types are added. +// todomigryz: define counter transformer factory +// that can provide respective counter transformer for different type of counters. +// (ie RowCountCounterTranformer, MSecCounterTransformer, UserCounterTransformer, etc) +private static class MonitoringInfoToCounterUpdateTransformer { + + private final Map transformIdMapping; + + public MonitoringInfoToCounterUpdateTransformer( + final Map transformIdMapping) { +this.transformIdMapping = transformIdMapping; + } + + // todo: search code for "beam:metrics"... and replace them with relevant enums from + // proto after rebasing above https://github.com/apache/beam/pull/6799 that + // introduces relevant proto entries. + final String BEAM_METRICS_USER_PREFIX = "beam:metric:user"; + + private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo monitoringInfo) { +long value = monitoringInfo.getMetric().getCounterData().getInt64Value(); +String urn = monitoringInfo.getUrn(); + +String type = monitoringInfo.getType(); + +// todo: run MonitoringInfo through validation process. +// refer to https://github.com/apache/beam/pull/6799 + +if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) { + if (!type.equals("beam:metrics:sum_int_64")) { +return null; + } + + final String ptransform = monitoringInfo.getLabelsMap().get("PTRANSFORM"); + if (ptransform == null) { +return null; + } + + DataflowStepContext stepContext = transformIdMapping.get(ptransform); + + CounterStructuredNameAndMetadata name = new CounterStructuredNameAndMetadata(); + + String nameWithNamespace = + monitoringInfo + .getUrn() + .substring(BEAM_METRICS_USER_PREFIX.length()) + .replace("^:", ""); + + final int lastColonIndex = nameWithNamespace.lastIndexOf(':'); + String counterName = nameWithNamespace.substring(lastColonIndex + 1); + String counterNamespace = nameWithNamespace.substring(0, lastColonIndex); + + name.setName( + new CounterStructuredName() + .setOrigin(Origin.USER.toString()) + // Workaround for bug in python sdk that missed colon after ...metric:user. + .setName(counterName) + .setOriginalStepName( + stepContext == null ? null : stepContext.getNameContext().originalName()) + .setExecutionStepName( + stepContext == null ? null : stepContext.getNameContext().systemName()) + .setOriginNamespace(counterNamespace)) + .setMetadata(new CounterMetadata().setKind("SUM")); + + return new CounterUpdate() + .setStructuredNameAndMetadata(name) + .setCumulative(false) + .setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value)); +} +return null; + } +} + +private void updateMetrics(List monitoringInfos) { + final MonitoringInfoToCounterUpdateTransformer monitoringInfoToCounterUpdateTransformer = + new MonitoringInfoToCounterUpdateTransformer( + bundleProcessOperation.getPtransformIdToUserStepContext()); + String miDump = + monitoringInfos.stream().map(Objects::toString).collect(Collectors.joining("\n")); + + counterUpdates = + monitoringInfos + .stream() + .map(monitoringInfoToCounterUpdateTransformer::monitoringInfoToCounterUpdate) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + return; +} + +private String getStackTrace() { Review comment: rm debug code
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172500=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172500 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 22:04 Start Date: 05/Dec/18 22:04 Worklog Time Spent: 10m Work Description: ajamato commented on a change in pull request #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#discussion_r239255236 ## File path: runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java ## @@ -423,6 +423,131 @@ public void close() {} contains(new CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue))); } + @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10) + public void testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent() + throws Exception { +final String stepName = "fakeStepNameWithUserMetrics"; +final String namespace = "sdk/whatever"; +final String name = "someCounter"; +final int counterValue = 42; +final int finalCounterValue = 77; +final CountDownLatch progressSentLatch = new CountDownLatch(1); +final CountDownLatch processBundleLatch = new CountDownLatch(1); + +final BeamFnApi.Metrics.User.MetricName metricName = +BeamFnApi.Metrics.User.MetricName.newBuilder() +.setNamespace(namespace) +.setName(name) +.build(); + +final BeamFnApi.Metrics deprecatedMetrics = +BeamFnApi.Metrics.newBuilder() +.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS) +.putPtransforms( +stepName, +BeamFnApi.Metrics.PTransform.newBuilder() +.addUser( +BeamFnApi.Metrics.User.newBuilder() +.setMetricName(metricName) +.setCounterData( +BeamFnApi.Metrics.User.CounterData.newBuilder() +.setValue(finalCounterValue))) +.build()) +.build(); + +final int expectedCounterValue = 5; +final BeamFnApi.MonitoringInfo expectedMonitoringInfo = +BeamFnApi.MonitoringInfo.newBuilder() +.setUrn("beam:metric:user:ExpectedCounter") +.setType("beam:metrics:sum_int_64") +.putLabels("PTRANSFORM", "ExpectedPTransform") +.setMetric( +BeamFnApi.Metric.newBuilder() +.setCounterData( +BeamFnApi.CounterData.newBuilder() +.setInt64Value(expectedCounterValue) +.build()) +.build()) +.build(); + +InstructionRequestHandler instructionRequestHandler = +new InstructionRequestHandler() { + @Override + public CompletionStage handle(InstructionRequest request) { +switch (request.getRequestCase()) { + case REGISTER: +return CompletableFuture.completedFuture(responseFor(request).build()); + case PROCESS_BUNDLE: +return MoreFutures.supplyAsync( +() -> { + processBundleLatch.await(); + return responseFor(request) + .setProcessBundle( + BeamFnApi.ProcessBundleResponse.newBuilder() + .setMetrics(deprecatedMetrics) + .addMonitoringInfos(expectedMonitoringInfo)) + .build(); +}); + case PROCESS_BUNDLE_PROGRESS: +progressSentLatch.countDown(); +return CompletableFuture.completedFuture( +responseFor(request) +.setProcessBundleProgress( + BeamFnApi.ProcessBundleProgressResponse.newBuilder() +.setMetrics(deprecatedMetrics) +.addMonitoringInfos(expectedMonitoringInfo)) +.build()); + default: +throw new RuntimeException("Reached unexpected code path"); +} + } + + @Override + public void close() {} +}; + +RegisterAndProcessBundleOperation processOperation = +new RegisterAndProcessBundleOperation( +IdGenerators.decrementingLongs(), +instructionRequestHandler, +mockBeamFnStateDelegator, +REGISTER_REQUEST, +ImmutableMap.of(), +ImmutableMap.of(), +
[jira] [Work logged] (BEAM-5321) Finish Python 3 porting for transforms module
[ https://issues.apache.org/jira/browse/BEAM-5321?focusedWorklogId=172488=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172488 ] ASF GitHub Bot logged work on BEAM-5321: Author: ASF GitHub Bot Created on: 05/Dec/18 22:00 Start Date: 05/Dec/18 22:00 Worklog Time Spent: 10m Work Description: RobbeSneyders commented on issue #7104: [BEAM-5321] Port transforms package to Python 3 URL: https://github.com/apache/beam/pull/7104#issuecomment-444667493 Hi @tvalentyn, sorry for the wait. The solution implemented in this PR to install dill from a github command works on my local machine, but fails on Jenkins. It seems like the dependencies in setup.py are not processed correctly. I just tried to run this on a VM, which also worked. Any idea on why this might fail on Jenkins? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172488) Time Spent: 2h 10m (was: 2h) > Finish Python 3 porting for transforms module > - > > Key: BEAM-5321 > URL: https://issues.apache.org/jira/browse/BEAM-5321 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Robbe >Assignee: Robbe >Priority: Major > Fix For: Not applicable > > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5993) Create SideInput Load test
[ https://issues.apache.org/jira/browse/BEAM-5993?focusedWorklogId=172475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172475 ] ASF GitHub Bot logged work on BEAM-5993: Author: ASF GitHub Bot Created on: 05/Dec/18 20:59 Start Date: 05/Dec/18 20:59 Worklog Time Spent: 10m Work Description: pabloem commented on a change in pull request #7020: BEAM-5993 Create SideInput Load test URL: https://github.com/apache/beam/pull/7020#discussion_r239234878 ## File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py ## @@ -0,0 +1,158 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +To run test on DirectRunner + +python setup.py nosetests \ +--test-pipeline-options=" +--number_of_counter_operations=1000 +--input_options='{ +\"num_records\": 300, +\"key_size\": 5, +\"value_size\":15, +\"bundle_size_distribution_type\": \"const\", +\"bundle_size_distribution_param\": 1, +\"force_initial_num_bundles\": 0 +}' + " \ +--tests apache_beam.testing.load_tests.sideinput_test + +To run test on other runner (ex. Dataflow): + +python setup.py nosetests \ +--test-pipeline-options=" +--runner=TestDataflowRunner +--project=... +--staging_location=gs://... +--temp_location=gs://... +--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz +--number_of_counter_operations=1000 +--input_options='{ +\"num_records\": 1, +\"key_size\": 1, +\"value_size\":1, +\"bundle_size_distribution_type\": \"const\", +\"bundle_size_distribution_param\": 1, +\"force_initial_num_bundles\": 0 +}' +" \ +--tests apache_beam.testing.load_tests.sideinput_test + +""" + +from __future__ import absolute_import + +import json +import logging +import unittest + +import apache_beam as beam +from apache_beam.pvalue import AsIter +from apache_beam.testing import synthetic_pipeline +from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime +from apache_beam.testing.test_pipeline import TestPipeline + + +class SideInputTest(unittest.TestCase): + def _parseTestPipelineOptions(self): +return { +'numRecords': self.inputOptions.get('num_records'), +'keySizeBytes': self.inputOptions.get('key_size'), +'valueSizeBytes': self.inputOptions.get('value_size'), +'bundleSizeDistribution': { +'type': self.inputOptions.get( +'bundle_size_distribution_type', 'const' +), +'param': self.inputOptions.get('bundle_size_distribution_param', 0) +}, +'forceNumInitialBundles': self.inputOptions.get( +'force_initial_num_bundles', 0 +) +} + + def _getSideInput(self): +side_input = self._parseTestPipelineOptions() +side_input['numRecords'] = side_input['numRecords'] +side_input['keySizeBytes'] = side_input['keySizeBytes'] +side_input['valueSizeBytes'] = side_input['valueSizeBytes'] +return side_input + + def _getPerElementDelaySec(self): +return self.syntheticStepOptions.get('per_element_delay_sec', 0) + + def _getPerBundleDelaySec(self): +return self.syntheticStepOptions.get('per_bundle_delay_sec', 0) + + def _getOutputRecordsPerInputRecords(self): +return self.syntheticStepOptions.get('output_records_per_input_records', 0) + + def setUp(self): +self.pipeline = TestPipeline() +self.inputOptions = json.loads(self.pipeline.get_option('input_options')) +self.iterations = self.pipeline.get_option('number_of_counter_operations') +if self.iterations is None: + self.iterations = 1 + + class Consume(beam.DoFn): +def process(self, element, *args, **kwargs): + values_list = [] + for i in element: +for _, value in i.iteritems(): + values_list.append(value) + yield values_list + + def testSideInput(self): +def join_fn(element, side_input, iterations): + list = [] + for i in range(iterations): +for key, value in side_input: + if i == iterations - 1: +
[jira] [Work logged] (BEAM-6184) PortableRunner dependency missed in wordcount example maven artifact
[ https://issues.apache.org/jira/browse/BEAM-6184?focusedWorklogId=172473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172473 ] ASF GitHub Bot logged work on BEAM-6184: Author: ASF GitHub Bot Created on: 05/Dec/18 20:34 Start Date: 05/Dec/18 20:34 Worklog Time Spent: 10m Work Description: HuangLED commented on issue #7213: [BEAM-6184] Add portable-runner dependency to example pom.xml URL: https://github.com/apache/beam/pull/7213#issuecomment-444636635 R: @youngoli This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172473) Time Spent: 20m (was: 10m) > PortableRunner dependency missed in wordcount example maven artifact > > > Key: BEAM-6184 > URL: https://issues.apache.org/jira/browse/BEAM-6184 > Project: Beam > Issue Type: Improvement > Components: build-system >Reporter: Ruoyun Huang >Assignee: Ruoyun Huang >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > > > more context: > https://lists.apache.org/thread.html/8dd60395424425f7502d62888c49014430d1d3b06c026606f3db28ab@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle
[ https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172466 ] ASF GitHub Bot logged work on BEAM-6159: Author: ASF GitHub Bot Created on: 05/Dec/18 20:15 Start Date: 05/Dec/18 20:15 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #7015: [BEAM-6159] Migrate dataflow portable worker using shared library to process bundle URL: https://github.com/apache/beam/pull/7015#issuecomment-444630350 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172466) Time Spent: 1h (was: 50m) > Dataflow portable runner harness should use ExecutableStage to process bundle > - > > Key: BEAM-6159 > URL: https://issues.apache.org/jira/browse/BEAM-6159 > Project: Beam > Issue Type: Task > Components: runner-dataflow >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6184) PortableRunner dependency missed in wordcount example maven artifact
Ruoyun Huang created BEAM-6184: -- Summary: PortableRunner dependency missed in wordcount example maven artifact Key: BEAM-6184 URL: https://issues.apache.org/jira/browse/BEAM-6184 Project: Beam Issue Type: Improvement Components: build-system Reporter: Ruoyun Huang Assignee: Ruoyun Huang more context: https://lists.apache.org/thread.html/8dd60395424425f7502d62888c49014430d1d3b06c026606f3db28ab@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6184) PortableRunner dependency missed in wordcount example maven artifact
[ https://issues.apache.org/jira/browse/BEAM-6184?focusedWorklogId=172463=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172463 ] ASF GitHub Bot logged work on BEAM-6184: Author: ASF GitHub Bot Created on: 05/Dec/18 19:59 Start Date: 05/Dec/18 19:59 Worklog Time Spent: 10m Work Description: HuangLED opened a new pull request #7213: [BEAM-6184] Add portable-runner dependency to example pom.xml URL: https://github.com/apache/beam/pull/7213 Add portable-runner to example pom.xml's default dependency list. Follow this checklist to help us incorporate your contribution quickly and easily: - [ X] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172463) Time Spent: 10m Remaining Estimate: 0h > PortableRunner dependency missed in wordcount example maven artifact >
[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle
[ https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172462 ] ASF GitHub Bot logged work on BEAM-6159: Author: ASF GitHub Bot Created on: 05/Dec/18 19:40 Start Date: 05/Dec/18 19:40 Worklog Time Spent: 10m Work Description: boyuanzz edited a comment on issue #7015: [BEAM-6159] Migrate dataflow portable worker using shared library to process bundle URL: https://github.com/apache/beam/pull/7015#issuecomment-444614614 The squashed commit: https://github.com/apache/beam/pull/7015/commits/5c78520ed98576de76ff8d2ddc24c67ab70a6d21 addressed most comments above. There are several issues left which will be in the following up PRs: 1. Map output PCollection to following OutputReceivers. 2. Setup the gradle test task to track the new bundle processing with ExecutableStage. 3. Turn on the experiment path for streaming worker. Please take another look @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172462) Time Spent: 50m (was: 40m) > Dataflow portable runner harness should use ExecutableStage to process bundle > - > > Key: BEAM-6159 > URL: https://issues.apache.org/jira/browse/BEAM-6159 > Project: Beam > Issue Type: Task > Components: runner-dataflow >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172460=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172460 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 19:36 Start Date: 05/Dec/18 19:36 Worklog Time Spent: 10m Work Description: chamikaramj closed pull request #7210: [BEAM-6182] Cherry-pick: Disable conscrypt by default (#7203) URL: https://github.com/apache/beam/pull/7210 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java index aeeec3318206..13df96c85e88 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java @@ -57,16 +57,20 @@ public static DataflowWorkerHarnessOptions initializeGlobalStateAndPipelineOptio ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions); -if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) { +String experimentName = Experiment.EnableConscryptSecurityProvider.getName(); +if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) { /* Enable fast SSL provider. */ LOG.info( - "Dataflow runner uses conscrypt by default for SSL. To disable this feature, " - + "pass pipeline option --experiment=disable_conscrypt_security_provider"); + "Dataflow runner is using conscrypt SSL. To disable this feature, " + + "remove the pipeline option --experiments={}", + experimentName); Security.insertProviderAt(new OpenSSLProvider(), 1); } else { LOG.info( - "Experiment disable_conscrypt_security_provider specified, disabling conscrypt " - + "SSL. Note this is the default Java behavior, but may have reduced performance."); + "Not using conscrypt SSL. Note this is the default Java behavior, but may " + + "have reduced performance. To use conscrypt SSL pass pipeline option " + + "--experiments={}", + experimentName); } return pipelineOptions; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java index 3ce1e0874c90..3bf1ca8360fb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java @@ -36,7 +36,11 @@ /** Enumeration of all known experiments. */ public enum Experiment { -DisableConscryptSecurityProvider("disable_conscrypt_security_provider"), +/** + * Use the Conscrypt OpenSSL Java Security Provider. This may improve the performance of SSL + * operations for some IO connectors. + */ +EnableConscryptSecurityProvider("enable_conscrypt_security_provider"), IntertransformIO("intertransform_io"), // Intertransform metrics for Shuffle IO (insights) SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for Side Input IO This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172460) Time Spent: 1.5h (was: 1h 20m) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Blocker > Fix For: 2.9.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to >
[jira] [Work logged] (BEAM-5514) BigQueryIO doesn't handle quotaExceeded errors properly
[ https://issues.apache.org/jira/browse/BEAM-5514?focusedWorklogId=172461=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172461 ] ASF GitHub Bot logged work on BEAM-5514: Author: ASF GitHub Bot Created on: 05/Dec/18 19:36 Start Date: 05/Dec/18 19:36 Worklog Time Spent: 10m Work Description: ihji commented on issue #7189: [BEAM-5514] BigQueryIO doesn't handle quotaExceeded errors properly URL: https://github.com/apache/beam/pull/7189#issuecomment-444615779 splitted into #7189 and #7212 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172461) Time Spent: 1.5h (was: 1h 20m) > BigQueryIO doesn't handle quotaExceeded errors properly > --- > > Key: BEAM-5514 > URL: https://issues.apache.org/jira/browse/BEAM-5514 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Kevin Peterson >Assignee: Heejong Lee >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > When exceeding a streaming quota for BigQuery insertAll requests, BigQuery > returns a 403 with reason "quotaExceeded". > The current implementation of BigQueryIO does not consider this to be a rate > limited exception, and therefore does not perform exponential backoff > properly, leading to repeated calls to BQ. > The actual error is in the > [ApiErrorExtractor|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L739] > class, which is called from > [BigQueryServicesImpl|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/com/google/cloud/hadoop/util/ApiErrorExtractor.java#L263] > to determine how to retry the failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6183) BigQuery insertAll API request rate is not properly controlled
Heejong Lee created BEAM-6183: - Summary: BigQuery insertAll API request rate is not properly controlled Key: BEAM-6183 URL: https://issues.apache.org/jira/browse/BEAM-6183 Project: Beam Issue Type: Improvement Components: io-java-gcp Affects Versions: 2.8.0 Reporter: Heejong Lee Assignee: Chamikara Jayalath BigQuery insertAll API request rate is not properly controlled so it produces too many rate limit exceeded error messages in the worker log. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria
[ https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172459=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172459 ] ASF GitHub Bot logged work on BEAM-6174: Author: ASF GitHub Bot Created on: 05/Dec/18 19:33 Start Date: 05/Dec/18 19:33 Worklog Time Spent: 10m Work Description: chamikaramj closed pull request #7211: [BEAM-6174] Kryo dependency removed. URL: https://github.com/apache/beam/pull/7211 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/euphoria/build.gradle b/sdks/java/extensions/euphoria/build.gradle index 75d4b357be9c..e6ad57382ee6 100644 --- a/sdks/java/extensions/euphoria/build.gradle +++ b/sdks/java/extensions/euphoria/build.gradle @@ -21,13 +21,8 @@ applyJavaNature() description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL" -ext { -kryoVersion = '4.0.2' -} - dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") -shadow "com.esotericsoftware:kryo:${kryoVersion}" shadow library.java.guava testCompile library.java.mockito_core testCompile project(path: ":beam-sdks-java-extensions-kryo") This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172459) Time Spent: 1h 10m (was: 1h) > Remove unnecesarry Kryo dependecny from euphoria > > > Key: BEAM-6174 > URL: https://issues.apache.org/jira/browse/BEAM-6174 > Project: Beam > Issue Type: Sub-task > Components: dsl-euphoria >Reporter: Vaclav Plajt >Assignee: Vaclav Plajt >Priority: Major > Fix For: 2.10.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle
[ https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172458=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172458 ] ASF GitHub Bot logged work on BEAM-6159: Author: ASF GitHub Bot Created on: 05/Dec/18 19:33 Start Date: 05/Dec/18 19:33 Worklog Time Spent: 10m Work Description: boyuanzz commented on issue #7015: [BEAM-6159] Migrate dataflow portable worker using shared library to process bundle URL: https://github.com/apache/beam/pull/7015#issuecomment-444614614 The squashed commit: https://github.com/apache/beam/pull/7015/commits/924bc07e9866c6dbd4b1cdcfb7d43838453defdf addressed most comments above. There are several issues left which will be in the following up PRs: 1. Map output PCollection to following OutputReceivers. 2. Setup the gradle test task to track the new bundle processing with ExecutableStage. 3. Turn on the experiment path for streaming worker. Please take another look @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172458) Time Spent: 40m (was: 0.5h) > Dataflow portable runner harness should use ExecutableStage to process bundle > - > > Key: BEAM-6159 > URL: https://issues.apache.org/jira/browse/BEAM-6159 > Project: Beam > Issue Type: Task > Components: runner-dataflow >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-5462) get rid of .options deprecation warnings in tests
[ https://issues.apache.org/jira/browse/BEAM-5462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heejong Lee resolved BEAM-5462. --- Resolution: Fixed Fix Version/s: 2.10.0 > get rid of .options deprecation warnings in tests > --- > > Key: BEAM-5462 > URL: https://issues.apache.org/jira/browse/BEAM-5462 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Heejong Lee >Priority: Minor > Fix For: 2.10.0 > > Time Spent: 7h 10m > Remaining Estimate: 0h > > Messages look like: > {{/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py:360: > DeprecationWarning: options is deprecated since First stable release. > References to .options will not be supported}} > {{pipeline.replace_all(_get_transform_overrides(pipeline.options))}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle
[ https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172457=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172457 ] ASF GitHub Bot logged work on BEAM-6159: Author: ASF GitHub Bot Created on: 05/Dec/18 19:31 Start Date: 05/Dec/18 19:31 Worklog Time Spent: 10m Work Description: boyuanzz commented on a change in pull request #7015: [BEAM-6159] Migrate dataflow portable worker using shared library to process bundle URL: https://github.com/apache/beam/pull/7015#discussion_r239206538 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.fn.control; + +import com.google.common.collect.Iterables; +import java.io.Closeable; +import java.util.EnumMap; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase; +import org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext; +import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation; +import org.apache.beam.runners.fnexecution.control.BundleProgressHandler; +import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory; +import org.apache.beam.runners.fnexecution.control.RemoteBundle; +import org.apache.beam.runners.fnexecution.control.StageBundleFactory; +import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.runners.fnexecution.state.StateRequestHandlers; +import org.apache.beam.sdk.fn.data.FnDataReceiver; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProcessRemoteBundleOperation extends ReceivingOperation { + private static final Logger LOG = LoggerFactory.getLogger(ProcessRemoteBundleOperation.class); + private final StageBundleFactory stageBundleFactory; + private RemoteBundle remoteBundle; + private StateRequestHandler stateRequestHandler; + private BundleProgressHandler progressHandler; + + public ProcessRemoteBundleOperation( + OperationContext context, StageBundleFactory stageBundleFactory, OutputReceiver[] receivers) { +super(receivers, context); +this.stageBundleFactory = stageBundleFactory; +EnumMap handlerMap = new EnumMap<>(TypeCase.class); +stateRequestHandler = StateRequestHandlers.delegateBasedUponType(handlerMap); +progressHandler = BundleProgressHandler.ignored(); + } + + @Override + public void start() throws Exception { +try (Closeable scope = context.enterStart()) { + super.start(); + OutputReceiverFactory receiverFactory = Review comment: Fixed in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172457) Time Spent: 0.5h (was: 20m) > Dataflow portable runner harness should use ExecutableStage to process bundle > - > > Key: BEAM-6159 > URL: https://issues.apache.org/jira/browse/BEAM-6159 > Project: Beam > Issue Type: Task > Components: runner-dataflow >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6183) BigQuery insertAll API request rate is not properly controlled
[ https://issues.apache.org/jira/browse/BEAM-6183?focusedWorklogId=172456=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172456 ] ASF GitHub Bot logged work on BEAM-6183: Author: ASF GitHub Bot Created on: 05/Dec/18 19:29 Start Date: 05/Dec/18 19:29 Worklog Time Spent: 10m Work Description: ihji opened a new pull request #7212: [BEAM-6183] BigQuery insertAll API request rate is not properly controlled URL: https://github.com/apache/beam/pull/7212 add a rate limiter that dynamically adjusts insertAll submission rate below a given rate limit quota. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172456) Time Spent: 10m Remaining Estimate: 0h > BigQuery insertAll API request rate is not properly controlled >
[jira] [Assigned] (BEAM-6183) BigQuery insertAll API request rate is not properly controlled
[ https://issues.apache.org/jira/browse/BEAM-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Heejong Lee reassigned BEAM-6183: - Assignee: Heejong Lee (was: Chamikara Jayalath) > BigQuery insertAll API request rate is not properly controlled > -- > > Key: BEAM-6183 > URL: https://issues.apache.org/jira/browse/BEAM-6183 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.8.0 >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > > BigQuery insertAll API request rate is not properly controlled so it produces > too many rate limit exceeded error messages in the worker log. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria
[ https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172451 ] ASF GitHub Bot logged work on BEAM-6174: Author: ASF GitHub Bot Created on: 05/Dec/18 19:01 Start Date: 05/Dec/18 19:01 Worklog Time Spent: 10m Work Description: chamikaramj opened a new pull request #7211: [BEAM-6174] Kryo dependency removed. URL: https://github.com/apache/beam/pull/7211 Cherry-picking https://github.com/apache/beam/pull/7195 to 2.9.0 release branch. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172451) Time Spent: 1h (was: 50m) > Remove unnecesarry Kryo dependecny from euphoria > > > Key: BEAM-6174 > URL:
[jira] [Work logged] (BEAM-5959) Add Cloud KMS support to GCS copies
[ https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=172448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172448 ] ASF GitHub Bot logged work on BEAM-5959: Author: ASF GitHub Bot Created on: 05/Dec/18 18:53 Start Date: 05/Dec/18 18:53 Worklog Time Spent: 10m Work Description: udim commented on issue #7050: [BEAM-5959] Reimplement GCS copies with rewrites. URL: https://github.com/apache/beam/pull/7050#issuecomment-444599653 R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172448) Time Spent: 3h (was: 2h 50m) > Add Cloud KMS support to GCS copies > --- > > Key: BEAM-5959 > URL: https://issues.apache.org/jira/browse/BEAM-5959 > Project: Beam > Issue Type: Bug > Components: io-java-gcp, sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Beam SDK currently uses the CopyTo GCS API call, which doesn't support > copying objects that Customer Managed Encryption Keys (CMEK). > CMEKs are managed in Cloud KMS. > Items (for Java and Python SDKs): > - Update clients to versions that support KMS keys. > - Change copyTo API calls to use rewriteTo (Python - directly, Java - > possibly convert copyTo API call to use client library) > - Add unit tests. > - Add basic tests (DirectRunner and GCS buckets with CMEK). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6144) Add support for the autoscalingAlgorithm flag
[ https://issues.apache.org/jira/browse/BEAM-6144?focusedWorklogId=172450=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172450 ] ASF GitHub Bot logged work on BEAM-6144: Author: ASF GitHub Bot Created on: 05/Dec/18 18:58 Start Date: 05/Dec/18 18:58 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #7149: [BEAM-6144] Add support for the autoscaling flags to the GO SDK. URL: https://github.com/apache/beam/pull/7149#discussion_r239194436 ## File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go ## @@ -140,6 +144,18 @@ func Translate(p *pb.Pipeline, opts *JobOptions, workerURL, jarURL, modelURL str if opts.NumWorkers > 0 { job.Environment.WorkerPools[0].NumWorkers = opts.NumWorkers } + if opts.Algorithm != "" { + settings := { + Algorithm: map[string]string{ + "NONE": "AUTOSCALING_ALGORITHM_NONE", + "THROUGHPUT_BASED": "AUTOSCALING_ALGORITHM_BASIC", + }[opts.Algorithm], + } + if opts.MaxNumWorkers > 0 { Review comment: `MaxNumWorkers` could be set even if `opts.Algorithm` is not set. If Algorithm is not set, Dataflow will choose the appropriate autoscaling algortihm and max num workers would still be used for that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172450) Time Spent: 20m (was: 10m) > Add support for the autoscalingAlgorithm flag > - > > Key: BEAM-6144 > URL: https://issues.apache.org/jira/browse/BEAM-6144 > Project: Beam > Issue Type: Improvement > Components: sdk-go >Reporter: Andrew Brampton >Assignee: Robert Burke >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > The go SDK does not support the --autoscalingAlgorithm and --maxNumWorkers > flags, like the Java and Python SDKs. Add support for this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172442 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 18:49 Start Date: 05/Dec/18 18:49 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7210: [BEAM-6182] Cherry-pick: Disable conscrypt by default (#7203) URL: https://github.com/apache/beam/pull/7210#issuecomment-444598217 LGTM. Waiting for tests to pass. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172442) Time Spent: 1h 10m (was: 1h) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Blocker > Fix For: 2.9.0 > > Time Spent: 1h 10m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172443 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 18:49 Start Date: 05/Dec/18 18:49 Worklog Time Spent: 10m Work Description: chamikaramj commented on issue #7210: [BEAM-6182] Cherry-pick: Disable conscrypt by default (#7203) URL: https://github.com/apache/beam/pull/7210#issuecomment-444598262 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172443) Time Spent: 1h 20m (was: 1h 10m) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Blocker > Fix For: 2.9.0 > > Time Spent: 1h 20m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172435 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 18:36 Start Date: 05/Dec/18 18:36 Worklog Time Spent: 10m Work Description: aaltay opened a new pull request #7210: [BEAM-6182] Cherry-pick: Disable conscrypt by default (#7203) URL: https://github.com/apache/beam/pull/7210 * Disable conscrypt by default Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172435) Time Spent: 1h (was: 50m) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL:
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172438=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172438 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 18:42 Start Date: 05/Dec/18 18:42 Worklog Time Spent: 10m Work Description: Ardagan edited a comment on issue #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#issuecomment-444592947 @swegner (committer) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172438) Time Spent: 1h (was: 50m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172433=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172433 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 18:33 Start Date: 05/Dec/18 18:33 Worklog Time Spent: 10m Work Description: Ardagan commented on issue #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#issuecomment-444592947 @swegner This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172433) Time Spent: 50m (was: 40m) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.
[ https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172428 ] ASF GitHub Bot logged work on BEAM-6181: Author: ASF GitHub Bot Created on: 05/Dec/18 18:26 Start Date: 05/Dec/18 18:26 Worklog Time Spent: 10m Work Description: Ardagan commented on issue #7202: [BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow Runner. URL: https://github.com/apache/beam/pull/7202#issuecomment-444590796 @ajamato Please review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172428) Time Spent: 40m (was: 0.5h) > Utilize MetricInfo for reporting user metrics in Portable Dataflow Java > Runner. > --- > > Key: BEAM-6181 > URL: https://issues.apache.org/jira/browse/BEAM-6181 > Project: Beam > Issue Type: Bug > Components: java-fn-execution >Reporter: Mikhail Gryzykhin >Assignee: Mikhail Gryzykhin >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > New approach to report metrics in FnApi is to utilize MetricInfo structures. > This approach is implemented in Python SDK and work is ongoing in Java SDK. > This tasks includes plumbing User metrics reported via MetricInfos through > Dataflow Java Runner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172427=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172427 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 18:25 Start Date: 05/Dec/18 18:25 Worklog Time Spent: 10m Work Description: aaltay closed pull request #7203: [BEAM-6182] Disable conscrypt by default URL: https://github.com/apache/beam/pull/7203 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java index 75662f4a2c97..37ce990fb5f0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java @@ -57,18 +57,19 @@ public static DataflowWorkerHarnessOptions initializeGlobalStateAndPipelineOptio ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions); -String experimentName = Experiment.DisableConscryptSecurityProvider.getName(); -if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) { +String experimentName = Experiment.EnableConscryptSecurityProvider.getName(); +if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) { /* Enable fast SSL provider. */ LOG.info( - "Dataflow runner uses conscrypt by default for SSL. To disable this feature, " - + "pass pipeline option --experiments={}", + "Dataflow runner is using conscrypt SSL. To disable this feature, " + + "remove the pipeline option --experiments={}", experimentName); Security.insertProviderAt(new OpenSSLProvider(), 1); } else { LOG.info( - "Experiment {} specified, disabling conscrypt SSL. Note this is the default " - + "Java behavior, but may have reduced performance.", + "Not using conscrypt SSL. Note this is the default Java behavior, but may " + + "have reduced performance. To use conscrypt SSL pass pipeline option " + + "--experiments={}", experimentName); } return pipelineOptions; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java index 3ce1e0874c90..3bf1ca8360fb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java @@ -36,7 +36,11 @@ /** Enumeration of all known experiments. */ public enum Experiment { -DisableConscryptSecurityProvider("disable_conscrypt_security_provider"), +/** + * Use the Conscrypt OpenSSL Java Security Provider. This may improve the performance of SSL + * operations for some IO connectors. + */ +EnableConscryptSecurityProvider("enable_conscrypt_security_provider"), IntertransformIO("intertransform_io"), // Intertransform metrics for Shuffle IO (insights) SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for Side Input IO This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172427) Time Spent: 50m (was: 40m) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Blocker > Fix For: 2.9.0 > > Time Spent: 50m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it
[jira] [Updated] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmet Altay updated BEAM-6182: -- Priority: Blocker (was: Major) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Blocker > Fix For: 2.9.0 > > Time Spent: 40m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmet Altay updated BEAM-6182: -- Fix Version/s: 2.9.0 > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Major > Fix For: 2.9.0 > > Time Spent: 40m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172415=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172415 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 17:45 Start Date: 05/Dec/18 17:45 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #7203: [BEAM-6182] Disable conscrypt by default URL: https://github.com/apache/beam/pull/7203#discussion_r239168968 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java ## @@ -57,18 +57,19 @@ public static DataflowWorkerHarnessOptions initializeGlobalStateAndPipelineOptio ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions); -String experimentName = Experiment.DisableConscryptSecurityProvider.getName(); -if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) { +String experimentName = Experiment.EnableConscryptSecurityProvider.getName(); +if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) { /* Enable fast SSL provider. */ LOG.info( - "Dataflow runner uses conscrypt by default for SSL. To disable this feature, " - + "pass pipeline option --experiments={}", + "Dataflow runner is using conscrypt SSL. To disable this feature, " + + "remove the pipeline option --experiments={}", experimentName); Security.insertProviderAt(new OpenSSLProvider(), 1); } else { LOG.info( Review comment: I would like to keep it as an info since this is a change in behavior for Dataflow runner. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172415) Time Spent: 20m (was: 10m) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172417=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172417 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 17:48 Start Date: 05/Dec/18 17:48 Worklog Time Spent: 10m Work Description: aaltay commented on issue #7203: [BEAM-6182] Disable conscrypt by default URL: https://github.com/apache/beam/pull/7203#issuecomment-444578137 Thank you @swegner. Addressed your questions. Will merge this after tests pass. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172417) Time Spent: 40m (was: 0.5h) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172416=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172416 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 17:48 Start Date: 05/Dec/18 17:48 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #7203: [BEAM-6182] Disable conscrypt by default URL: https://github.com/apache/beam/pull/7203#discussion_r239169766 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java ## @@ -57,18 +57,19 @@ public static DataflowWorkerHarnessOptions initializeGlobalStateAndPipelineOptio ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions); -String experimentName = Experiment.DisableConscryptSecurityProvider.getName(); -if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) { +String experimentName = Experiment.EnableConscryptSecurityProvider.getName(); +if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) { Review comment: 1. We have a sense that IO bound pipelines could be affected up-to a 15%. (The number mentioned from the previous PR.) Likely typical regression will be much less. 2. This does not affect Beam in general. It affects Dataflow only. I think highlighting this in the release notes is a good idea. (Dataflow if they choose to do so could take additional proactive measures to communicate the change.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172416) Time Spent: 0.5h (was: 20m) > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
[ https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172414=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172414 ] ASF GitHub Bot logged work on BEAM-6182: Author: ASF GitHub Bot Created on: 05/Dec/18 17:45 Start Date: 05/Dec/18 17:45 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #7203: [BEAM-6182] Disable conscrypt by default URL: https://github.com/apache/beam/pull/7203#discussion_r239168760 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java ## @@ -36,7 +36,7 @@ /** Enumeration of all known experiments. */ public enum Experiment { -DisableConscryptSecurityProvider("disable_conscrypt_security_provider"), +EnableConscryptSecurityProvider("enable_conscrypt_security_provider"), Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172414) Time Spent: 10m Remaining Estimate: 0h > Use of conscrypt SSL results in stuck workflows in Dataflow > --- > > Key: BEAM-6182 > URL: https://issues.apache.org/jira/browse/BEAM-6182 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Ahmet Altay >Assignee: Tyler Akidau >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > An experimental flag is being added to disable it for now with an option to > enable it per-workflow. > Also related: > https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its > latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow
Ahmet Altay created BEAM-6182: - Summary: Use of conscrypt SSL results in stuck workflows in Dataflow Key: BEAM-6182 URL: https://issues.apache.org/jira/browse/BEAM-6182 Project: Beam Issue Type: Bug Components: runner-dataflow Reporter: Ahmet Altay Assignee: Tyler Akidau An experimental flag is being added to disable it for now with an option to enable it per-workflow. Also related: https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its latest version. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-6151) MongoDbIO add support mongodb server with self signed ssl
[ https://issues.apache.org/jira/browse/BEAM-6151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jean-Baptiste Onofré updated BEAM-6151: --- Fix Version/s: (was: 2.9.0) 2.10.0 > MongoDbIO add support mongodb server with self signed ssl > - > > Key: BEAM-6151 > URL: https://issues.apache.org/jira/browse/BEAM-6151 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.8.0 >Reporter: Chaim >Assignee: Jean-Baptiste Onofré >Priority: Major > Fix For: 2.10.0 > > Time Spent: 50m > Remaining Estimate: 0h > > If the mongodb server does not have a certified ssl certificate you cannot > connect to the server -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6151) MongoDbIO add support mongodb server with self signed ssl
[ https://issues.apache.org/jira/browse/BEAM-6151?focusedWorklogId=172400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172400 ] ASF GitHub Bot logged work on BEAM-6151: Author: ASF GitHub Bot Created on: 05/Dec/18 17:27 Start Date: 05/Dec/18 17:27 Worklog Time Spent: 10m Work Description: jbonofre closed pull request #7162: BEAM-6151: MongoDbIO add support mongodb server with self signed ssl URL: https://github.com/apache/beam/pull/7162 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 9d6e05bfb209..309a30d582b7 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -104,6 +104,9 @@ public static Read read() { .setKeepAlive(true) .setMaxConnectionIdleTime(6) .setNumSplits(0) +.setSslEnabled(false) +.setIgnoreSSLCertificate(false) +.setSslInvalidHostNameAllowed(false) .build(); } @@ -113,6 +116,9 @@ public static Write write() { .setKeepAlive(true) .setMaxConnectionIdleTime(6) .setBatchSize(1024L) +.setSslEnabled(false) +.setIgnoreSSLCertificate(false) +.setSslInvalidHostNameAllowed(false) .build(); } @@ -128,6 +134,12 @@ private MongoDbIO() {} abstract int maxConnectionIdleTime(); +abstract boolean sslEnabled(); + +abstract boolean sslInvalidHostNameAllowed(); + +abstract boolean ignoreSSLCertificate(); + @Nullable abstract String database(); @@ -152,6 +164,12 @@ private MongoDbIO() {} abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime); + abstract Builder setSslEnabled(boolean value); + + abstract Builder setSslInvalidHostNameAllowed(boolean value); + + abstract Builder setIgnoreSSLCertificate(boolean value); + abstract Builder setDatabase(String database); abstract Builder setCollection(String collection); @@ -215,6 +233,21 @@ public Read withMaxConnectionIdleTime(int maxConnectionIdleTime) { return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build(); } +/** Enable ssl for connection. */ +public Read withSSLEnabled(boolean sslEnabled) { + return builder().setSslEnabled(sslEnabled).build(); +} + +/** Enable invalidHostNameAllowed for ssl for connection. */ +public Read withSSLInvalidHostNameAllowed(boolean invalidHostNameAllowed) { + return builder().setSslInvalidHostNameAllowed(invalidHostNameAllowed).build(); +} + +/** Enable ignoreSSLCertificate for ssl for connection (allow for self signed ceritificates). */ +public Read withIgnoreSSLCertificate(boolean ignoreSSLCertificate) { + return builder().setIgnoreSSLCertificate(ignoreSSLCertificate).build(); +} + /** Sets the database to use. */ public Read withDatabase(String database) { checkArgument(database != null, "database can not be null"); @@ -259,6 +292,10 @@ public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("uri", uri())); builder.add(DisplayData.item("keepAlive", keepAlive())); builder.add(DisplayData.item("maxConnectionIdleTime", maxConnectionIdleTime())); + builder.add(DisplayData.item("sslEnabled", sslEnabled())); + builder.add(DisplayData.item("sslInvalidHostNameAllowed", sslInvalidHostNameAllowed())); + builder.add(DisplayData.item("ignoreSSLCertificate", ignoreSSLCertificate())); + builder.add(DisplayData.item("database", database())); builder.add(DisplayData.item("collection", collection())); builder.addIfNotNull(DisplayData.item("filter", filter())); @@ -270,6 +307,22 @@ public void populateDisplayData(DisplayData.Builder builder) { } } + private static MongoClientOptions.Builder getOptions( + boolean keepAlive, + int maxConnectionIdleTime, + boolean sslEnabled, + boolean sslInvalidHostNameAllowed) { +MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder(); + optionsBuilder.socketKeepAlive(keepAlive).maxConnectionIdleTime(maxConnectionIdleTime); +if (sslEnabled) { + optionsBuilder + .sslEnabled(sslEnabled) + .sslInvalidHostNameAllowed(sslInvalidHostNameAllowed) + .sslContext(SSLUtils.ignoreSSLCertificate()); +} +return optionsBuilder; + } + /**
[jira] [Resolved] (BEAM-6151) MongoDbIO add support mongodb server with self signed ssl
[ https://issues.apache.org/jira/browse/BEAM-6151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jean-Baptiste Onofré resolved BEAM-6151. Resolution: Fixed > MongoDbIO add support mongodb server with self signed ssl > - > > Key: BEAM-6151 > URL: https://issues.apache.org/jira/browse/BEAM-6151 > Project: Beam > Issue Type: Improvement > Components: io-java-mongodb >Affects Versions: 2.8.0 >Reporter: Chaim >Assignee: Jean-Baptiste Onofré >Priority: Major > Fix For: 2.10.0 > > Time Spent: 50m > Remaining Estimate: 0h > > If the mongodb server does not have a certified ssl certificate you cannot > connect to the server -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)
[ https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172398 ] ASF GitHub Bot logged work on BEAM-6167: Author: ASF GitHub Bot Created on: 05/Dec/18 17:16 Start Date: 05/Dec/18 17:16 Worklog Time Spent: 10m Work Description: lcaggio commented on a change in pull request #7193: [BEAM-6167] Add class ReadFromTextWithFilename (Python) URL: https://github.com/apache/beam/pull/7193#discussion_r239158713 ## File path: sdks/python/apache_beam/io/textio.py ## @@ -527,6 +533,61 @@ def expand(self, pvalue): return pvalue.pipeline | Read(self._source) +class ReadFromTextWithFilename(PTransform): Review comment: I like the _ReadFromTextBase approach, I'm working to update the code accordingly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172398) Time Spent: 50m (was: 40m) > Create a Class to read content of a file keeping track of the file path > (python) > > > Key: BEAM-6167 > URL: https://issues.apache.org/jira/browse/BEAM-6167 > Project: Beam > Issue Type: Improvement > Components: io-ideas >Affects Versions: 2.8.0 >Reporter: Lorenzo Caggioni >Assignee: Eugene Kirpichov >Priority: Minor > Fix For: Not applicable > > Time Spent: 50m > Remaining Estimate: 0h > > Add a class to read content of a file keeping track of the file path each > element come from. > This is an improvement of the current python/apache_beam/io/textio.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5817) Nexmark test of joining stream to files
[ https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=172388=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172388 ] ASF GitHub Bot logged work on BEAM-5817: Author: ASF GitHub Bot Created on: 05/Dec/18 16:59 Start Date: 05/Dec/18 16:59 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #7205: [BEAM-5817] Add SQL bounded side input join to queries that are actually run URL: https://github.com/apache/beam/pull/7205 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index c797064b4b5a..8bafecc03ea0 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -78,6 +78,7 @@ import org.apache.beam.sdk.nexmark.queries.Query8Model; import org.apache.beam.sdk.nexmark.queries.Query9; import org.apache.beam.sdk.nexmark.queries.Query9Model; +import org.apache.beam.sdk.nexmark.queries.sql.SqlBoundedSideInputJoin; import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0; import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery1; import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery2; @@ -1240,6 +1241,9 @@ private NexmarkQueryModel getNexmarkQueryModel() { .put( NexmarkQueryName.HIGHEST_BID, new NexmarkQuery(configuration, new SqlQuery7(configuration))) +.put( +NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN, +new NexmarkQuery(configuration, new SqlBoundedSideInputJoin(configuration))) .build(); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172388) Time Spent: 8h 10m (was: 8h) > Nexmark test of joining stream to files > --- > > Key: BEAM-5817 > URL: https://issues.apache.org/jira/browse/BEAM-5817 > Project: Beam > Issue Type: New Feature > Components: examples-nexmark >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles >Priority: Major > Fix For: 2.10.0 > > Time Spent: 8h 10m > Remaining Estimate: 0h > > Nexmark is a convenient framework for testing the use case of large scale > stream enrichment. One way is joining a stream to files, and it can be tested > via any source that Nexmark supports. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria
[ https://issues.apache.org/jira/browse/BEAM-6174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Moravek resolved BEAM-6174. - Resolution: Fixed > Remove unnecesarry Kryo dependecny from euphoria > > > Key: BEAM-6174 > URL: https://issues.apache.org/jira/browse/BEAM-6174 > Project: Beam > Issue Type: Sub-task > Components: dsl-euphoria >Reporter: Vaclav Plajt >Assignee: Vaclav Plajt >Priority: Major > Fix For: 2.10.0 > > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria
[ https://issues.apache.org/jira/browse/BEAM-6174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Moravek updated BEAM-6174: Fix Version/s: 2.10.0 > Remove unnecesarry Kryo dependecny from euphoria > > > Key: BEAM-6174 > URL: https://issues.apache.org/jira/browse/BEAM-6174 > Project: Beam > Issue Type: Sub-task > Components: dsl-euphoria >Reporter: Vaclav Plajt >Assignee: Vaclav Plajt >Priority: Major > Fix For: 2.10.0 > > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria
[ https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172369=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172369 ] ASF GitHub Bot logged work on BEAM-6174: Author: ASF GitHub Bot Created on: 05/Dec/18 16:37 Start Date: 05/Dec/18 16:37 Worklog Time Spent: 10m Work Description: dmvk commented on issue #7195: [BEAM-6174] Kryo dependency removed. URL: https://github.com/apache/beam/pull/7195#issuecomment-444552530 Target release changed to 2.10 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172369) Time Spent: 50m (was: 40m) > Remove unnecesarry Kryo dependecny from euphoria > > > Key: BEAM-6174 > URL: https://issues.apache.org/jira/browse/BEAM-6174 > Project: Beam > Issue Type: Sub-task > Components: dsl-euphoria >Reporter: Vaclav Plajt >Assignee: Vaclav Plajt >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria
[ https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172368=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172368 ] ASF GitHub Bot logged work on BEAM-6174: Author: ASF GitHub Bot Created on: 05/Dec/18 16:37 Start Date: 05/Dec/18 16:37 Worklog Time Spent: 10m Work Description: dmvk closed pull request #7195: [BEAM-6174] Kryo dependency removed. URL: https://github.com/apache/beam/pull/7195 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/extensions/euphoria/build.gradle b/sdks/java/extensions/euphoria/build.gradle index 75d4b357be9c..e6ad57382ee6 100644 --- a/sdks/java/extensions/euphoria/build.gradle +++ b/sdks/java/extensions/euphoria/build.gradle @@ -21,13 +21,8 @@ applyJavaNature() description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL" -ext { -kryoVersion = '4.0.2' -} - dependencies { shadow project(path: ":beam-sdks-java-core", configuration: "shadow") -shadow "com.esotericsoftware:kryo:${kryoVersion}" shadow library.java.guava testCompile library.java.mockito_core testCompile project(path: ":beam-sdks-java-extensions-kryo") This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172368) Time Spent: 40m (was: 0.5h) > Remove unnecesarry Kryo dependecny from euphoria > > > Key: BEAM-6174 > URL: https://issues.apache.org/jira/browse/BEAM-6174 > Project: Beam > Issue Type: Sub-task > Components: dsl-euphoria >Reporter: Vaclav Plajt >Assignee: Vaclav Plajt >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner
[ https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172359 ] ASF GitHub Bot logged work on BEAM-6165: Author: ASF GitHub Bot Created on: 05/Dec/18 16:20 Start Date: 05/Dec/18 16:20 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7183: [BEAM-6165] send metrics to Flink in portable Flink runner URL: https://github.com/apache/beam/pull/7183#discussion_r239135253 ## File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py ## @@ -59,25 +65,63 @@ environment_type = known_args.environment_type.lower() environment_config = ( known_args.environment_config if known_args.environment_config else None) + test = known_args.test # This is defined here to only be run when we invoke this file explicitly. class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): _use_grpc = True _use_subprocesses = True +conf_dir = None + +@classmethod +def tearDownClass(cls): + if cls.conf_dir and exists(cls.conf_dir): +logging.info("removing conf dir: %s" % cls.conf_dir) +rmtree(cls.conf_dir) + super(FlinkRunnerTest, cls).tearDownClass() + +@classmethod +def _create_conf_dir(cls): + """Create (and save a static reference to) a "conf dir", used to provide metrics configs and + verify metrics output + + It gets cleaned up when the suite is done executing""" + + if hasattr(cls, 'conf_dir'): +cls.conf_dir = mkdtemp(prefix='flinktest-conf') + +# path for a FileReporter to write metrics to +cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt') + +# path to write Flink configuration to +conf_path = path.join(cls.conf_dir, 'flink-conf.yaml') +with open(conf_path, 'w') as f: + f.write(linesep.join([ +'metrics.reporters: test', +'metrics.reporter.test.class: org.apache.beam.runners.flink.metrics.FileReporter', +'metrics.reporter.test.file: %s' % cls.test_metrics_path + ])) + @classmethod def _subprocess_command(cls, port): - tmp_dir = tempfile.mkdtemp(prefix='flinktest') + # will be cleaned up at the end of this method, and recreated and used by the job server + tmp_dir = mkdtemp(prefix='flinktest') + + cls._create_conf_dir() + try: return [ 'java', '-jar', flink_job_server_jar, +'--flink-master-url', '[local]', Review comment: thanks, I am using the `--flink-conf-dir` added in #7031; I don't see a way to avoid specifying `[local]` here though. it also sounds like we are saying that this test is intended to only run in local mode, so we may as well make that explicit? The previous (`[auto]`) code path goes through Flink code that doesn't seem to support injecting an existing `Configuration` (cf. [streaming](https://github.com/apache/flink/blob/release-1.5.5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1595-L1612), [batch](https://github.com/apache/flink/blob/release-1.5.5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L1058-L1061); presumably this makes sense because it would be strange to do that if actually in remote/cluster mode?) lmk if that doesn't make sense or there is something I should change here, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172359) Time Spent: 5h 50m (was: 5h 40m) > Send metrics to Flink in portable Flink runner > -- > > Key: BEAM-6165 > URL: https://issues.apache.org/jira/browse/BEAM-6165 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Ryan Williams >Assignee: Ryan Williams >Priority: Major > Labels: metrics, portability, portability-flink > Time Spent: 5h 50m > Remaining Estimate: 0h > > Metrics are sent from the fn harness to runner in the Python SDK (and likely > Java soon), but the portable Flink runner doesn't pass them on to Flink, > which it should, so that users can see them in e.g. the Flink UI or via any > Flink metrics reporters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172349=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172349 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239129246 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java ## @@ -78,6 +81,40 @@ public void processElement( String executableStageName = ExecutableStageTranslation.generateNameFromStagePayload(basePayload); -assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, count}")); +assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, Composite}")); + } + + @Test + public void testOperatorNameGenerationFromNames() { +assertGeneratedNames("A", "A", Arrays.asList("A")); +assertGeneratedNames("A/a1", "A/a1", Arrays.asList("A/a1")); +assertGeneratedNames("A/{a1, a2}", "A/{a1, a2}", Arrays.asList("A/a1", "A/a2")); +assertGeneratedNames( +"A/{a1, a2}", "A/{a1, a2/{a2.1, a2.2}}", Arrays.asList("A/a1", "A/a2/a2.1", "A/a2/a2.2")); +assertGeneratedNames("{A, B}", "{A/{a1, a2}, B}", Arrays.asList("A/a1", "A/a2", "B")); +assertGeneratedNames( +"{A, B, C}", "{A/{a1, a2}, B, C/c/cc}", Arrays.asList("A/a1", "A/a2", "B", "C/c/cc")); Review comment: The logic is that we only recurse into the composites if we have only one of it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172349) Time Spent: 2.5h (was: 2h 20m) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 2.5h > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172354=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172354 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239122914 ## File path: runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java ## @@ -78,6 +81,40 @@ public void processElement( String executableStageName = ExecutableStageTranslation.generateNameFromStagePayload(basePayload); -assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, count}")); +assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, Composite}")); + } + + @Test + public void testOperatorNameGenerationFromNames() { +assertGeneratedNames("A", "A", Arrays.asList("A")); +assertGeneratedNames("A/a1", "A/a1", Arrays.asList("A/a1")); +assertGeneratedNames("A/{a1, a2}", "A/{a1, a2}", Arrays.asList("A/a1", "A/a2")); +assertGeneratedNames( +"A/{a1, a2}", "A/{a1, a2/{a2.1, a2.2}}", Arrays.asList("A/a1", "A/a2/a2.1", "A/a2/a2.2")); +assertGeneratedNames("{A, B}", "{A/{a1, a2}, B}", Arrays.asList("A/a1", "A/a2", "B")); +assertGeneratedNames( +"{A, B, C}", "{A/{a1, a2}, B, C/c/cc}", Arrays.asList("A/a1", "A/a2", "B", "C/c/cc")); Review comment: It seems inconsistent that we don't get `A{a1/a2}, B, C` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172354) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 2h 50m > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172353 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239130429 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java ## @@ -45,19 +53,60 @@ public static String generateNameFromStagePayload(ExecutableStagePayload stagePa RunnerApi.Components components = stagePayload.getComponents(); final int transformsCount = stagePayload.getTransformsCount(); sb.append("[").append(transformsCount).append("]"); -sb.append("{"); +Collection names = new ArrayList<>(); for (int i = 0; i < transformsCount; i++) { String name = components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName(); - // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes the name longer - name = name.replaceFirst("^ref_AppliedPTransform_", ""); // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the name longer name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", ""); - sb.append(name); - if (i + 1 < transformsCount) { -sb.append(", "); - } + names.add(name); } -sb.append("}"); +sb.append(generateNameFromTransformNames(names, true)); return sb.toString(); } + + public static String generateNameFromTransformNames(Collection names, boolean truncate) { +Multimap groupByOuter = LinkedHashMultimap.create(); +for (String name : names) { + int index = name.indexOf('/'); + if (index == -1) { +groupByOuter.put(name, ""); + } else { +groupByOuter.put(name.substring(0, index), name.substring(index + 1)); + } +} +if (groupByOuter.keySet().size() == 1) { + Map.Entry> outer = + Iterables.getOnlyElement(groupByOuter.asMap().entrySet()); + if (outer.getValue().size() == 1 && outer.getValue().contains("")) { +// Names consisted of a single name without any slashes. +return outer.getKey(); + } else { +// Everything is in the same outer stage, enumerate at one level down. +return String.format( +"%s/%s", outer.getKey(), generateNameFromTransformNames(outer.getValue(), truncate)); + } +} else { + Collection parts; + if (truncate) { +// Enumerate the outer stages without their composite structure, if any. +parts = groupByOuter.keySet(); Review comment: Note to self: Recursion anchor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172353) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 2h 50m > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172350=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172350 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239105166 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java ## @@ -45,19 +53,60 @@ public static String generateNameFromStagePayload(ExecutableStagePayload stagePa RunnerApi.Components components = stagePayload.getComponents(); final int transformsCount = stagePayload.getTransformsCount(); sb.append("[").append(transformsCount).append("]"); -sb.append("{"); +Collection names = new ArrayList<>(); for (int i = 0; i < transformsCount; i++) { String name = components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName(); - // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes the name longer - name = name.replaceFirst("^ref_AppliedPTransform_", ""); // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the name longer name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", ""); - sb.append(name); - if (i + 1 < transformsCount) { -sb.append(", "); - } + names.add(name); } -sb.append("}"); +sb.append(generateNameFromTransformNames(names, true)); return sb.toString(); } + + public static String generateNameFromTransformNames(Collection names, boolean truncate) { Review comment: Would it make sense to add a short JavaDoc comment, explaining what kind of modes this method offers for generating a name from the transforms? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172350) Time Spent: 2h 40m (was: 2.5h) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172351=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172351 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239130260 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java ## @@ -45,19 +53,60 @@ public static String generateNameFromStagePayload(ExecutableStagePayload stagePa RunnerApi.Components components = stagePayload.getComponents(); final int transformsCount = stagePayload.getTransformsCount(); sb.append("[").append(transformsCount).append("]"); -sb.append("{"); +Collection names = new ArrayList<>(); for (int i = 0; i < transformsCount; i++) { String name = components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName(); - // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes the name longer - name = name.replaceFirst("^ref_AppliedPTransform_", ""); // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the name longer name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", ""); - sb.append(name); - if (i + 1 < transformsCount) { -sb.append(", "); - } + names.add(name); } -sb.append("}"); +sb.append(generateNameFromTransformNames(names, true)); return sb.toString(); } + + public static String generateNameFromTransformNames(Collection names, boolean truncate) { +Multimap groupByOuter = LinkedHashMultimap.create(); +for (String name : names) { + int index = name.indexOf('/'); + if (index == -1) { +groupByOuter.put(name, ""); + } else { +groupByOuter.put(name.substring(0, index), name.substring(index + 1)); + } +} +if (groupByOuter.keySet().size() == 1) { + Map.Entry> outer = + Iterables.getOnlyElement(groupByOuter.asMap().entrySet()); + if (outer.getValue().size() == 1 && outer.getValue().contains("")) { +// Names consisted of a single name without any slashes. +return outer.getKey(); Review comment: Note to self: Recursion anchor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172351) Time Spent: 2h 40m (was: 2.5h) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 2h 40m > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172348=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172348 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239114817 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java ## @@ -45,19 +53,60 @@ public static String generateNameFromStagePayload(ExecutableStagePayload stagePa RunnerApi.Components components = stagePayload.getComponents(); final int transformsCount = stagePayload.getTransformsCount(); sb.append("[").append(transformsCount).append("]"); -sb.append("{"); +Collection names = new ArrayList<>(); for (int i = 0; i < transformsCount; i++) { String name = components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName(); - // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes the name longer - name = name.replaceFirst("^ref_AppliedPTransform_", ""); // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the name longer name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", ""); - sb.append(name); - if (i + 1 < transformsCount) { -sb.append(", "); - } + names.add(name); } -sb.append("}"); +sb.append(generateNameFromTransformNames(names, true)); return sb.toString(); } + + public static String generateNameFromTransformNames(Collection names, boolean truncate) { Review comment: Would `collapseComposites` be a better name than `truncate`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172348) Time Spent: 2.5h (was: 2h 20m) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 2.5h > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172352=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172352 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239104348 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java ## @@ -45,19 +53,60 @@ public static String generateNameFromStagePayload(ExecutableStagePayload stagePa RunnerApi.Components components = stagePayload.getComponents(); final int transformsCount = stagePayload.getTransformsCount(); sb.append("[").append(transformsCount).append("]"); -sb.append("{"); +Collection names = new ArrayList<>(); for (int i = 0; i < transformsCount; i++) { String name = components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName(); - // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes the name longer - name = name.replaceFirst("^ref_AppliedPTransform_", ""); // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the name longer name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", ""); - sb.append(name); - if (i + 1 < transformsCount) { -sb.append(", "); - } + names.add(name); } -sb.append("}"); +sb.append(generateNameFromTransformNames(names, true)); return sb.toString(); } + + public static String generateNameFromTransformNames(Collection names, boolean truncate) { +Multimap groupByOuter = LinkedHashMultimap.create(); +for (String name : names) { + int index = name.indexOf('/'); Review comment: I suppose names can never contain slashes or they are escaped? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172352) Time Spent: 2h 50m (was: 2h 40m) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 2h 50m > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172355=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172355 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 16:10 Start Date: 05/Dec/18 16:10 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208#discussion_r239128952 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java ## @@ -45,19 +53,60 @@ public static String generateNameFromStagePayload(ExecutableStagePayload stagePa RunnerApi.Components components = stagePayload.getComponents(); final int transformsCount = stagePayload.getTransformsCount(); sb.append("[").append(transformsCount).append("]"); -sb.append("{"); +Collection names = new ArrayList<>(); for (int i = 0; i < transformsCount; i++) { String name = components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName(); - // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes the name longer - name = name.replaceFirst("^ref_AppliedPTransform_", ""); // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the name longer name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", ""); - sb.append(name); - if (i + 1 < transformsCount) { -sb.append(", "); - } + names.add(name); } -sb.append("}"); +sb.append(generateNameFromTransformNames(names, true)); return sb.toString(); } + + public static String generateNameFromTransformNames(Collection names, boolean truncate) { +Multimap groupByOuter = LinkedHashMultimap.create(); +for (String name : names) { + int index = name.indexOf('/'); + if (index == -1) { +groupByOuter.put(name, ""); + } else { +groupByOuter.put(name.substring(0, index), name.substring(index + 1)); + } +} +if (groupByOuter.keySet().size() == 1) { + Map.Entry> outer = + Iterables.getOnlyElement(groupByOuter.asMap().entrySet()); + if (outer.getValue().size() == 1 && outer.getValue().contains("")) { +// Names consisted of a single name without any slashes. +return outer.getKey(); + } else { +// Everything is in the same outer stage, enumerate at one level down. +return String.format( +"%s/%s", outer.getKey(), generateNameFromTransformNames(outer.getValue(), truncate)); Review comment: We go down a level here but only if we had multiple elements inside the same composite transform and no other composite transforms. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172355) Time Spent: 3h (was: 2h 50m) > Improve Traceability of Pipeline translation > > > Key: BEAM-5859 > URL: https://issues.apache.org/jira/browse/BEAM-5859 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Priority: Major > Labels: portability, portability-flink > Attachments: tfx.png, wordcount.png > > Time Spent: 3h > Remaining Estimate: 0h > > Users often ask how they can reason about the pipeline translation. The Flink > UI display a confusingly large graph without any trace of the original Beam > pipeline: > WordCount: > !wordcount.png! > TFX: > !tfx.png! > Some aspects which make understanding these graphs hard: > * Users don't know how the Runner maps Beam to Flink concepts > * The UI is awfully slow / hangs when the pipeline is reasonable complex > * The operator names seem to use {{transform.getUniqueName()}} which doesn't > generate readable name > * So called Chaining combines operators into a single operator which makes > understanding which Beam concept belongs to which Flink concept even harder > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner
[ https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172343=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172343 ] ASF GitHub Bot logged work on BEAM-6165: Author: ASF GitHub Bot Created on: 05/Dec/18 15:59 Start Date: 05/Dec/18 15:59 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7183: [BEAM-6165] send metrics to Flink in portable Flink runner URL: https://github.com/apache/beam/pull/7183#discussion_r239123843 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.AbstractReporter; + +/** + * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics reporter} for writing + * metrics to a file (specified via the "metrics.reporter.test.file" config key). + */ +public class FileReporter extends AbstractReporter { Review comment: (woops, I'd not seen Max's comment above when I posted this, but seems like we're saying the same thing) fwiw: "beam", "flink", "metrics", "file", and "reporter" are already in this class' name: `org.apache.beam.runners.flink.metrics.FileReporter` (and users would use this FQN in `flink-conf.yaml`, as in the example above). I generally dislike the Java convention of repeating package-name segments in class' basenames, but recognize that it's common and people seem to like it, so happy to add here, just wanted to float the idea that `org.apache.beam.runners.flink.metrics.BeamFlinkFileMetricReporter` is redundant. lmk if you still prefer it and I'll change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172343) Time Spent: 5h 40m (was: 5.5h) > Send metrics to Flink in portable Flink runner > -- > > Key: BEAM-6165 > URL: https://issues.apache.org/jira/browse/BEAM-6165 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Ryan Williams >Assignee: Ryan Williams >Priority: Major > Labels: metrics, portability, portability-flink > Time Spent: 5h 40m > Remaining Estimate: 0h > > Metrics are sent from the fn harness to runner in the Python SDK (and likely > Java soon), but the portable Flink runner doesn't pass them on to Flink, > which it should, so that users can see them in e.g. the Flink UI or via any > Flink metrics reporters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner
[ https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172340=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172340 ] ASF GitHub Bot logged work on BEAM-6165: Author: ASF GitHub Bot Created on: 05/Dec/18 15:54 Start Date: 05/Dec/18 15:54 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7183: [BEAM-6165] send metrics to Flink in portable Flink runner URL: https://github.com/apache/beam/pull/7183#discussion_r239123843 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.AbstractReporter; + +/** + * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics reporter} for writing + * metrics to a file (specified via the "metrics.reporter.test.file" config key). + */ +public class FileReporter extends AbstractReporter { Review comment: fwiw: "beam", "flink", "metrics", "file", and "reporter" are already in this class' name: `org.apache.beam.runners.flink.metrics.FileReporter` (and users would use this FQN in `flink-conf.yaml`, as in the example above). I generally dislike the Java convention of repeating package-name segments in class' basenames, but recognize that it's common and people seem to like it, so happy to add here, just wanted to float the idea that `org.apache.beam.runners.flink.metrics.BeamFlinkFileMetricReporter` is redundant. lmk if you still prefer it and I'll change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172340) Time Spent: 5.5h (was: 5h 20m) > Send metrics to Flink in portable Flink runner > -- > > Key: BEAM-6165 > URL: https://issues.apache.org/jira/browse/BEAM-6165 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Ryan Williams >Assignee: Ryan Williams >Priority: Major > Labels: metrics, portability, portability-flink > Time Spent: 5.5h > Remaining Estimate: 0h > > Metrics are sent from the fn harness to runner in the Python SDK (and likely > Java soon), but the portable Flink runner doesn't pass them on to Flink, > which it should, so that users can see them in e.g. the Flink UI or via any > Flink metrics reporters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5419) Build multiple versions of the Flink Runner against different Flink versions
[ https://issues.apache.org/jira/browse/BEAM-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710251#comment-16710251 ] Jeroen Steggink commented on BEAM-5419: --- [~mxm], that's great! I'm interested in 1.6.x and 1.7.x. Different projects, with different needs. > Build multiple versions of the Flink Runner against different Flink versions > > > Key: BEAM-5419 > URL: https://issues.apache.org/jira/browse/BEAM-5419 > Project: Beam > Issue Type: New Feature > Components: build-system, runner-flink >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.10.0 > > > Following up on a discussion on the mailing list. > We want to keep the Flink version stable across different versions to avoid > upgrade pain for long-term users. At the same time, there are users out there > with newer Flink clusters and developers also want to utilize new Flink > features. > It would be great to build multiple versions of the Flink Runner against > different Flink versions. > When the upgrade is as simple as changing the version property in the build > script, this should be pretty straight-forward. If not, having a "base > version" and applying a patch during the build could be an option. We should > avoid duplicating any Runner code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner
[ https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172329=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172329 ] ASF GitHub Bot logged work on BEAM-6165: Author: ASF GitHub Bot Created on: 05/Dec/18 15:44 Start Date: 05/Dec/18 15:44 Worklog Time Spent: 10m Work Description: ryan-williams commented on a change in pull request #7183: [BEAM-6165] send metrics to Flink in portable Flink runner URL: https://github.com/apache/beam/pull/7183#discussion_r238770025 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.AbstractReporter; + +/** + * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics reporter} for writing + * metrics to a file (specified via the "metrics.reporter.test.file" config key). + */ +public class FileReporter extends AbstractReporter { Review comment: > it's configured via a test property ah, [I left the reporter alias as `test` in `flink_runner_test`](https://github.com/apache/beam/pull/7183/files#diff-195e4a1918aec54f6245e49081cadb5aR99) when I renamed this class to remove the `Test` prefix. I should change that alias to `file`, either way. *(update: I did this, and also left [a placeholder in the `FileReporter` javadoc](https://github.com/apache/beam/pull/7183/files#diff-4b3942dfc66918e7fd06a1ed781f0442R30) for whatever alias the user wants to use as the name of their `FileReporter`)* This is a reporter that writes the final state of all metrics to a file of the user's choosing. It can do this in test or production environments. It doesn't perform any assertions, or otherwise do things that only make sense in a testing context. It is fairly similar to [Slf4jReporter](https://github.com/apache/flink/blob/release-1.5.5/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporter.java#L41), but its output is easier to parse (for machines or humans), and would normally be configured like: ``` metrics.reporters: file, metrics.reporter.file.class: org.apache.beam.runners.flink.metrics.FileReporter metrics.reporter.file.file: /some/path/on/job-mgr/node ``` To me, it feels weirder to name it `Test*` and leave it under `main/`, when it is not really *that* test-specific an abstraction. However I understand if the possible non-test uses feel contrived, and perhaps having `Test*` names under `main/` isn't too smelly. I just want to make sure the vestigial `metrics.reporter.test.*` namespacing isn't driving the decision too heavily. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172329) Time Spent: 5h 20m (was: 5h 10m) > Send metrics to Flink in portable Flink runner > -- > > Key: BEAM-6165 > URL: https://issues.apache.org/jira/browse/BEAM-6165 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Ryan Williams >Assignee: Ryan Williams >Priority: Major > Labels: metrics, portability, portability-flink > Time Spent: 5h 20m > Remaining Estimate: 0h > > Metrics are sent from the fn harness to runner in the Python SDK (and likely > Java soon), but the
[jira] [Commented] (BEAM-5419) Build multiple versions of the Flink Runner against different Flink versions
[ https://issues.apache.org/jira/browse/BEAM-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710157#comment-16710157 ] Maximilian Michels commented on BEAM-5419: -- Since this gets asked for more and more, I will prioritize this for 2.10.0. With Flink 1.7 out, we should provide at least builds for 1.7 and 1.6, and most likely also 1.5 because supporting it is about the same effort as supporting 1.6. [~jeroens] Are you interested in 1.6 or 1.7? > Build multiple versions of the Flink Runner against different Flink versions > > > Key: BEAM-5419 > URL: https://issues.apache.org/jira/browse/BEAM-5419 > Project: Beam > Issue Type: New Feature > Components: build-system, runner-flink >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Major > Fix For: 2.10.0 > > > Following up on a discussion on the mailing list. > We want to keep the Flink version stable across different versions to avoid > upgrade pain for long-term users. At the same time, there are users out there > with newer Flink clusters and developers also want to utilize new Flink > features. > It would be great to build multiple versions of the Flink Runner against > different Flink versions. > When the upgrade is as simple as changing the version property in the build > script, this should be pretty straight-forward. If not, having a "base > version" and applying a patch during the build could be an option. We should > avoid duplicating any Runner code. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6172) Flink metrics are not generated in standard format
[ https://issues.apache.org/jira/browse/BEAM-6172?focusedWorklogId=172310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172310 ] ASF GitHub Bot logged work on BEAM-6172: Author: ASF GitHub Bot Created on: 05/Dec/18 15:02 Start Date: 05/Dec/18 15:02 Worklog Time Spent: 10m Work Description: mxm commented on issue #7207: [BEAM-6172] Adjust Flink metric names / Add metric reporting tests URL: https://github.com/apache/beam/pull/7207#issuecomment-444515725 CC @ryan-williams This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172310) Time Spent: 20m (was: 10m) > Flink metrics are not generated in standard format > -- > > Key: BEAM-6172 > URL: https://issues.apache.org/jira/browse/BEAM-6172 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Micah Wylde >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > The metrics that the flink runner exports do not follow the standard format > used by Flink, and doesn't respect Flink metric configuration options. > For example (with the default metrics configuration) beam produces a metric: > {code} > 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.group.0.__counter__group__org-apache-beam-runners-core-ReduceFnRunner__droppedDueToClosedWindow > {code} > whereas a native Flink metric looks like: > {code} > 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.Source-Custom-Source-7Kinesis-None-beam-env-docker-v1-0-ToKeyedWorkItem.0.numRecordsOut > {code} > In particular, Beam should respect the > [metric.scope.delimiter|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#metrics-scope-delimiter] > configuration for separating components of a metric (currently it uses > "__"), and should not include the type of metric (counter, gauge, etc.). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)
[ https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172301=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172301 ] ASF GitHub Bot logged work on BEAM-6167: Author: ASF GitHub Bot Created on: 05/Dec/18 14:50 Start Date: 05/Dec/18 14:50 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7193: [BEAM-6167] Add class ReadFromTextWithFilename (Python) URL: https://github.com/apache/beam/pull/7193#discussion_r239095304 ## File path: sdks/python/apache_beam/io/textio.py ## @@ -527,6 +533,61 @@ def expand(self, pvalue): return pvalue.pipeline | Read(self._source) +class ReadFromTextWithFilename(PTransform): Review comment: Yes, I agree with that sentiment. The disadvantage I see here is that there will have to be an active effort to keep the (non-trivial) set of arguments in sync between the two from now on. Another option would be to have a _ReadFromTextBase with two subclasses that differ only in a class attribute set to either _TextSource or _FilenameTextSource. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172301) Time Spent: 40m (was: 0.5h) > Create a Class to read content of a file keeping track of the file path > (python) > > > Key: BEAM-6167 > URL: https://issues.apache.org/jira/browse/BEAM-6167 > Project: Beam > Issue Type: Improvement > Components: io-ideas >Affects Versions: 2.8.0 >Reporter: Lorenzo Caggioni >Assignee: Eugene Kirpichov >Priority: Minor > Fix For: Not applicable > > Time Spent: 40m > Remaining Estimate: 0h > > Add a class to read content of a file keeping track of the file path each > element come from. > This is an improvement of the current python/apache_beam/io/textio.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)
[ https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172296=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172296 ] ASF GitHub Bot logged work on BEAM-6167: Author: ASF GitHub Bot Created on: 05/Dec/18 14:35 Start Date: 05/Dec/18 14:35 Worklog Time Spent: 10m Work Description: lcaggio commented on a change in pull request #7193: [BEAM-6167] Add class ReadFromTextWithFilename (Python) URL: https://github.com/apache/beam/pull/7193#discussion_r239089178 ## File path: sdks/python/apache_beam/io/textio.py ## @@ -527,6 +533,61 @@ def expand(self, pvalue): return pvalue.pipeline | Read(self._source) +class ReadFromTextWithFilename(PTransform): Review comment: This is definitely an option. I implemented it in this way because I found strage have a class that change the returned data type format based on an optional parameter. In this way, it is more clear (and documented) that if I want to get the source filename, I will need to change the class **and** the way I handle elements in the following steps. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172296) Time Spent: 0.5h (was: 20m) > Create a Class to read content of a file keeping track of the file path > (python) > > > Key: BEAM-6167 > URL: https://issues.apache.org/jira/browse/BEAM-6167 > Project: Beam > Issue Type: Improvement > Components: io-ideas >Affects Versions: 2.8.0 >Reporter: Lorenzo Caggioni >Assignee: Eugene Kirpichov >Priority: Minor > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > Add a class to read content of a file keeping track of the file path each > element come from. > This is an improvement of the current python/apache_beam/io/textio.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)
[ https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172287=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172287 ] ASF GitHub Bot logged work on BEAM-6167: Author: ASF GitHub Bot Created on: 05/Dec/18 13:59 Start Date: 05/Dec/18 13:59 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7193: [BEAM-6167] Add class ReadFromTextWithFilename (Python) URL: https://github.com/apache/beam/pull/7193#discussion_r239072795 ## File path: sdks/python/apache_beam/io/textio.py ## @@ -383,7 +389,7 @@ def open(self, temp_path): if self._header is not None: file_handle.write(self._header) if self._append_trailing_newlines: -file_handle.write(b'\n') +file_handle.write('\n') Review comment: I think these are required for Python 3. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172287) Time Spent: 20m (was: 10m) > Create a Class to read content of a file keeping track of the file path > (python) > > > Key: BEAM-6167 > URL: https://issues.apache.org/jira/browse/BEAM-6167 > Project: Beam > Issue Type: Improvement > Components: io-ideas >Affects Versions: 2.8.0 >Reporter: Lorenzo Caggioni >Assignee: Eugene Kirpichov >Priority: Minor > Fix For: Not applicable > > Time Spent: 20m > Remaining Estimate: 0h > > Add a class to read content of a file keeping track of the file path each > element come from. > This is an improvement of the current python/apache_beam/io/textio.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)
[ https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172288=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172288 ] ASF GitHub Bot logged work on BEAM-6167: Author: ASF GitHub Bot Created on: 05/Dec/18 13:59 Start Date: 05/Dec/18 13:59 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7193: [BEAM-6167] Add class ReadFromTextWithFilename (Python) URL: https://github.com/apache/beam/pull/7193#discussion_r239074727 ## File path: sdks/python/apache_beam/io/textio.py ## @@ -527,6 +533,61 @@ def expand(self, pvalue): return pvalue.pipeline | Read(self._source) +class ReadFromTextWithFilename(PTransform): Review comment: Perhaps it would make more sense to allow returning filenames be an option for all file-based sources, rather than a new transform? At the very least, much of the redundancy here could be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172288) Time Spent: 20m (was: 10m) > Create a Class to read content of a file keeping track of the file path > (python) > > > Key: BEAM-6167 > URL: https://issues.apache.org/jira/browse/BEAM-6167 > Project: Beam > Issue Type: Improvement > Components: io-ideas >Affects Versions: 2.8.0 >Reporter: Lorenzo Caggioni >Assignee: Eugene Kirpichov >Priority: Minor > Fix For: Not applicable > > Time Spent: 20m > Remaining Estimate: 0h > > Add a class to read content of a file keeping track of the file path each > element come from. > This is an improvement of the current python/apache_beam/io/textio.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation
[ https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172279=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172279 ] ASF GitHub Bot logged work on BEAM-5859: Author: ASF GitHub Bot Created on: 05/Dec/18 13:53 Start Date: 05/Dec/18 13:53 Worklog Time Spent: 10m Work Description: robertwb opened a new pull request #7208: [BEAM-5859] Better handle fused composite stage names. URL: https://github.com/apache/beam/pull/7208 I decided to default to truncated names because they both look nicer and they have more information if the UI is going to (implicitly or explicitly) truncate them on length anyways. Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). It will help us expedite review of your Pull Request if you tag someone (e.g. `@username`) to look at it. Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172279) Time Spent: 2h 20m (was: 2h 10m) > Improve Traceability of
[jira] [Work logged] (BEAM-6172) Flink metrics are not generated in standard format
[ https://issues.apache.org/jira/browse/BEAM-6172?focusedWorklogId=172269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172269 ] ASF GitHub Bot logged work on BEAM-6172: Author: ASF GitHub Bot Created on: 05/Dec/18 13:20 Start Date: 05/Dec/18 13:20 Worklog Time Spent: 10m Work Description: mxm opened a new pull request #7207: [BEAM-6172] Adjust Flink metric names / Add metric reporting tests URL: https://github.com/apache/beam/pull/7207 This picks up the metrics delimiter from the Flink config and removes the metric type prefix. This improves consistency with Flink's defaults and makes metric names more readable. R @mwylde CC @tweise Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | --- | --- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172269) Time Spent: 10m Remaining Estimate: 0h > Flink metrics are not generated in standard format > -- > > Key: BEAM-6172 > URL: https://issues.apache.org/jira/browse/BEAM-6172 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Micah Wylde >Assignee: Maximilian Michels >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > The metrics that the flink runner exports do not follow the standard format > used by
[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam
[ https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172247=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172247 ] ASF GitHub Bot logged work on BEAM-6094: Author: ASF GitHub Bot Created on: 05/Dec/18 11:48 Start Date: 05/Dec/18 11:48 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7078: [BEAM-6094] Implement external environment for portable BeamPython. URL: https://github.com/apache/beam/pull/7078#discussion_r239030465 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -1045,6 +1046,11 @@ message ProcessPayload { map env = 4; // Environment variables } +message ExternalPayload { + ApiServiceDescriptor endpoint = 1; Review comment: ApiServiceDescriptor has the same meaning as everything else. Correct. I started going down the road of spec-ing out a full REST protocol here, but figured that was better deferred to future work, and this PR focuses on the more basic mechanisms while leaving the transport layer more extensible (either withing the ApiServiceDescriptor, if we decide to go that route, or along side it as a different endpoint type, in which case we could make this a oneof). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172247) Time Spent: 1h 40m (was: 1.5h) > Implement External environment for Portable Beam > > > Key: BEAM-6094 > URL: https://issues.apache.org/jira/browse/BEAM-6094 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam
[ https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172252=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172252 ] ASF GitHub Bot logged work on BEAM-6094: Author: ASF GitHub Bot Created on: 05/Dec/18 11:48 Start Date: 05/Dec/18 11:48 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7078: [BEAM-6094] Implement external environment for portable BeamPython. URL: https://github.com/apache/beam/pull/7078#discussion_r239031203 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import com.google.common.base.Preconditions; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerGrpc; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; +import org.apache.beam.sdk.fn.IdGenerator; +import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link EnvironmentFactory} which forks processes based on the given URL in the Environment. + * The returned {@link ProcessEnvironment} has to make sure to stop the processes. + */ +public class ExternalEnvironmentFactory implements EnvironmentFactory { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalEnvironmentFactory.class); + + public static ExternalEnvironmentFactory create( + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + ControlClientPool.Source clientSource, + IdGenerator idGenerator) { +return new ExternalEnvironmentFactory( +controlServiceServer, +loggingServiceServer, +retrievalServiceServer, +provisioningServiceServer, +idGenerator, +clientSource); + } + + private final GrpcFnServer controlServiceServer; + private final GrpcFnServer loggingServiceServer; + private final GrpcFnServer retrievalServiceServer; + private final GrpcFnServer provisioningServiceServer; + private final IdGenerator idGenerator; + private final ControlClientPool.Source clientSource; + + private ExternalEnvironmentFactory( + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + IdGenerator idGenerator, + ControlClientPool.Source clientSource) { +this.controlServiceServer = controlServiceServer; +this.loggingServiceServer = loggingServiceServer; +this.retrievalServiceServer = retrievalServiceServer; +this.provisioningServiceServer = provisioningServiceServer; +this.idGenerator = idGenerator; +this.clientSource = clientSource; + } + + /** Creates a new, active {@link RemoteEnvironment} backed by a forked process. */ + @Override + public RemoteEnvironment createEnvironment(Environment environment) throws Exception { +Preconditions.checkState( +environment +.getUrn() + .equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.EXTERNAL)), +
[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam
[ https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172250=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172250 ] ASF GitHub Bot logged work on BEAM-6094: Author: ASF GitHub Bot Created on: 05/Dec/18 11:48 Start Date: 05/Dec/18 11:48 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7078: [BEAM-6094] Implement external environment for portable BeamPython. URL: https://github.com/apache/beam/pull/7078#discussion_r239032814 ## File path: sdks/python/apache_beam/runners/portability/local_job_service_main.py ## @@ -32,10 +32,8 @@ def run(argv): parser.add_argument('-p', '--port', type=int, help='port on which to serve the job api') - parser.add_argument('--worker_command_line', - help='command line for starting up a worker process') options = parser.parse_args(argv) - job_servicer = local_job_service.LocalJobServicer(options.worker_command_line) + job_servicer = local_job_service.LocalJobServicer() Review comment: No, this is now passed via an environment rather than out-of-band. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172250) Time Spent: 1h 50m (was: 1h 40m) > Implement External environment for Portable Beam > > > Key: BEAM-6094 > URL: https://issues.apache.org/jira/browse/BEAM-6094 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam
[ https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172251=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172251 ] ASF GitHub Bot logged work on BEAM-6094: Author: ASF GitHub Bot Created on: 05/Dec/18 11:48 Start Date: 05/Dec/18 11:48 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7078: [BEAM-6094] Implement external environment for portable BeamPython. URL: https://github.com/apache/beam/pull/7078#discussion_r239031615 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.environment; + +import com.google.common.base.Preconditions; +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerGrpc; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; +import org.apache.beam.runners.core.construction.BeamUrns; +import org.apache.beam.runners.fnexecution.GrpcFnServer; +import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; +import org.apache.beam.runners.fnexecution.control.ControlClientPool; +import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; +import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; +import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; +import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; +import org.apache.beam.sdk.fn.IdGenerator; +import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link EnvironmentFactory} which forks processes based on the given URL in the Environment. + * The returned {@link ProcessEnvironment} has to make sure to stop the processes. + */ +public class ExternalEnvironmentFactory implements EnvironmentFactory { + + private static final Logger LOG = LoggerFactory.getLogger(ExternalEnvironmentFactory.class); + + public static ExternalEnvironmentFactory create( + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + ControlClientPool.Source clientSource, + IdGenerator idGenerator) { +return new ExternalEnvironmentFactory( +controlServiceServer, +loggingServiceServer, +retrievalServiceServer, +provisioningServiceServer, +idGenerator, +clientSource); + } + + private final GrpcFnServer controlServiceServer; + private final GrpcFnServer loggingServiceServer; + private final GrpcFnServer retrievalServiceServer; + private final GrpcFnServer provisioningServiceServer; + private final IdGenerator idGenerator; + private final ControlClientPool.Source clientSource; + + private ExternalEnvironmentFactory( + GrpcFnServer controlServiceServer, + GrpcFnServer loggingServiceServer, + GrpcFnServer retrievalServiceServer, + GrpcFnServer provisioningServiceServer, + IdGenerator idGenerator, + ControlClientPool.Source clientSource) { +this.controlServiceServer = controlServiceServer; +this.loggingServiceServer = loggingServiceServer; +this.retrievalServiceServer = retrievalServiceServer; +this.provisioningServiceServer = provisioningServiceServer; +this.idGenerator = idGenerator; +this.clientSource = clientSource; + } + + /** Creates a new, active {@link RemoteEnvironment} backed by a forked process. */ + @Override + public RemoteEnvironment createEnvironment(Environment environment) throws Exception { +Preconditions.checkState( +environment +.getUrn() + .equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.EXTERNAL)), +
[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam
[ https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172249=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172249 ] ASF GitHub Bot logged work on BEAM-6094: Author: ASF GitHub Bot Created on: 05/Dec/18 11:48 Start Date: 05/Dec/18 11:48 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7078: [BEAM-6094] Implement external environment for portable BeamPython. URL: https://github.com/apache/beam/pull/7078#discussion_r239033306 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -118,10 +151,12 @@ def run_pipeline(self, pipeline): docker = DockerizedJobServer() job_endpoint = docker.start() -proto_context = pipeline_context.PipelineContext( +# This is needed as we start a worker server if one is requested +# but none is provided. +cleanup_callbacks = [] Review comment: I cleaned this up providing a new LOOPBACK environment rather than have external implicitly create workers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172249) > Implement External environment for Portable Beam > > > Key: BEAM-6094 > URL: https://issues.apache.org/jira/browse/BEAM-6094 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam
[ https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172248=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172248 ] ASF GitHub Bot logged work on BEAM-6094: Author: ASF GitHub Bot Created on: 05/Dec/18 11:48 Start Date: 05/Dec/18 11:48 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7078: [BEAM-6094] Implement external environment for portable BeamPython. URL: https://github.com/apache/beam/pull/7078#discussion_r239032505 ## File path: sdks/python/apache_beam/pipeline.py ## @@ -606,13 +606,15 @@ def visit_value(self, value, _): return Visitor.ok def to_runner_api( - self, return_context=False, context=None, use_fake_coders=False): + self, return_context=False, context=None, use_fake_coders=False, + default_environment=None): """For internal use only; no backwards-compatibility guarantees.""" from apache_beam.runners import pipeline_context from apache_beam.portability.api import beam_runner_api_pb2 if context is None: context = pipeline_context.PipelineContext( - use_fake_coders=use_fake_coders) + use_fake_coders=use_fake_coders, + default_environment=default_environment) Review comment: It's up to the context to come up with a default environment if none is set. The argument here just overrides that default. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172248) Time Spent: 1h 40m (was: 1.5h) > Implement External environment for Portable Beam > > > Key: BEAM-6094 > URL: https://issues.apache.org/jira/browse/BEAM-6094 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam
[ https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172241=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172241 ] ASF GitHub Bot logged work on BEAM-6094: Author: ASF GitHub Bot Created on: 05/Dec/18 11:32 Start Date: 05/Dec/18 11:32 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7078: [BEAM-6094] Implement external environment for portable BeamPython. URL: https://github.com/apache/beam/pull/7078#discussion_r239029555 ## File path: model/fn-execution/src/main/proto/beam_fn_api.proto ## @@ -967,3 +967,23 @@ service BeamFnLogging { stream LogControl ) {} } + + +message StartWorkerRequest { + string worker_id = 1; + org.apache.beam.model.pipeline.v1.ApiServiceDescriptor control_endpoint = 2; + org.apache.beam.model.pipeline.v1.ApiServiceDescriptor logging_endpoint = 3; + org.apache.beam.model.pipeline.v1.ApiServiceDescriptor artifact_endpoint = 4; + org.apache.beam.model.pipeline.v1.ApiServiceDescriptor provision_endpoint = 5; + map params = 10; +} + +message StartWorkerResponse { + string error = 1; +} + +service BeamFnExternalWorker { Review comment: Correct, the lifecycle is unmanaged. Can you think of a better name for this? AllocateWorker? InformWorker?? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172241) Time Spent: 1.5h (was: 1h 20m) > Implement External environment for Portable Beam > > > Key: BEAM-6094 > URL: https://issues.apache.org/jira/browse/BEAM-6094 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6067) Dataflow runner should include portable pipeline coder id in CloudObject coder representation
[ https://issues.apache.org/jira/browse/BEAM-6067?focusedWorklogId=172234=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172234 ] ASF GitHub Bot logged work on BEAM-6067: Author: ASF GitHub Bot Created on: 05/Dec/18 11:27 Start Date: 05/Dec/18 11:27 Worklog Time Spent: 10m Work Description: robertwb commented on issue #7081: [BEAM-6067] In Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard CloudObject coders URL: https://github.com/apache/beam/pull/7081#issuecomment-53189 This looks OK to me. (As an aside, I wonder why the transform nodes themselves don't have a reference to the pipeline...) I resolved the merge conflict, and will merge assuming all tests passing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172234) Time Spent: 7h (was: 6h 50m) Remaining Estimate: 161h (was: 161h 10m) > Dataflow runner should include portable pipeline coder id in CloudObject > coder representation > - > > Key: BEAM-6067 > URL: https://issues.apache.org/jira/browse/BEAM-6067 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Craig Chambers >Assignee: Craig Chambers >Priority: Major > Original Estimate: 168h > Time Spent: 7h > Remaining Estimate: 161h > > When translating a BeamJava Coder into the DataflowRunner's CloudObject > property map, include a property that specifies the id in the Beam model > Pipeline coders map corresponding to that Coder. This will allow the > DataflowRunner to reference the corresponding Beam coder in the FnAPI > processing bundle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6153) PR 7130 causes transforms/util_test to fail
[ https://issues.apache.org/jira/browse/BEAM-6153?focusedWorklogId=172227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172227 ] ASF GitHub Bot logged work on BEAM-6153: Author: ASF GitHub Bot Created on: 05/Dec/18 11:04 Start Date: 05/Dec/18 11:04 Worklog Time Spent: 10m Work Description: robertwb closed pull request #7170: [BEAM-6153] Re-enable coder optimization. URL: https://github.com/apache/beam/pull/7170 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index 03d0a56c053e..db724e652f83 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -25,11 +25,14 @@ cimport libc.stdint cimport libc.stdlib cimport libc.string +cdef extern from "math.h": + libc.stdint.int64_t abs "llabs"(libc.stdint.int64_t) + from .stream cimport InputStream, OutputStream from apache_beam.utils cimport windowed_value -cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size +cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size, past_unicode # Temporarily untyped to allow monkeypatching on failed import. #cdef type WindowedValue @@ -75,8 +78,11 @@ cdef unsigned char SET_TYPE cdef class FastPrimitivesCoderImpl(StreamCoderImpl): cdef CoderImpl fallback_coder_impl - @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t) + @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t, + unicode_value=unicode) cpdef encode_to_stream(self, value, OutputStream stream, bint nested) + @cython.locals(t=int) + cpdef decode_from_stream(self, InputStream stream, bint nested) cdef class BytesCoderImpl(CoderImpl): @@ -123,6 +129,9 @@ cdef class TupleCoderImpl(AbstractComponentCoderImpl): cdef class SequenceCoderImpl(StreamCoderImpl): cdef CoderImpl _elem_coder cpdef _construct_from_sequence(self, values) + @cython.locals(buffer=OutputStream, target_buffer_size=libc.stdint.int64_t, + index=libc.stdint.int64_t) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) cdef class TupleSequenceCoderImpl(SequenceCoderImpl): @@ -133,8 +142,41 @@ cdef class IterableCoderImpl(SequenceCoderImpl): pass +cdef object IntervalWindow + +cdef class IntervalWindowCoderImpl(StreamCoderImpl): + cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value) + cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value) + + @cython.locals(typed_value=windowed_value._IntervalWindowBase, + span_millis=libc.stdint.int64_t) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) + + @cython.locals(typed_value=windowed_value._IntervalWindowBase) + cpdef decode_from_stream(self, InputStream stream, bint nested) + + @cython.locals(typed_value=windowed_value._IntervalWindowBase, + span_millis=libc.stdint.int64_t) + cpdef estimate_size(self, value, bint nested=?) + + +cdef int PaneInfoTiming_UNKNOWN +cdef int PaneInfoEncoding_FIRST + + cdef class PaneInfoCoderImpl(StreamCoderImpl): - cdef int _choose_encoding(self, value) + cdef int _choose_encoding(self, windowed_value.PaneInfo value) + + @cython.locals(pane_info=windowed_value.PaneInfo, encoding_type=int) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) + + @cython.locals(encoded_first_byte=int, encoding_type=int) + cpdef decode_from_stream(self, InputStream stream, bint nested) + + +cdef libc.stdint.uint64_t _TIME_SHIFT +cdef libc.stdint.int64_t MIN_TIMESTAMP_micros +cdef libc.stdint.int64_t MAX_TIMESTAMP_micros cdef class WindowedValueCoderImpl(StreamCoderImpl): @@ -144,8 +186,18 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl): cdef CoderImpl _windows_coder cdef CoderImpl _pane_info_coder + cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value) + cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value) + @cython.locals(c=CoderImpl) cpdef get_estimated_size_and_observables(self, value, bint nested=?) - @cython.locals(wv=windowed_value.WindowedValue) + @cython.locals(timestamp=libc.stdint.int64_t) + cpdef decode_from_stream(self, InputStream stream, bint nested) + + @cython.locals(wv=windowed_value.WindowedValue, restore_sign=int) cpdef encode_to_stream(self, value, OutputStream stream, bint nested) + + +cdef class LengthPrefixCoderImpl(StreamCoderImpl): + cdef CoderImpl _value_coder diff --git
[jira] [Work logged] (BEAM-6120) Support retrieval of large gbk iterables over the state API.
[ https://issues.apache.org/jira/browse/BEAM-6120?focusedWorklogId=172229=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172229 ] ASF GitHub Bot logged work on BEAM-6120: Author: ASF GitHub Bot Created on: 05/Dec/18 11:12 Start Date: 05/Dec/18 11:12 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #7127: [BEAM-6120] Support retrieval of large gbk iterables over the state API. URL: https://github.com/apache/beam/pull/7127#discussion_r239023763 ## File path: model/pipeline/src/main/proto/beam_runner_api.proto ## @@ -588,6 +588,10 @@ message StandardCoders { // of the element // Components: The element coder and the window coder, in that order WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"]; + +// Encodes an iterable of elements, some of which may be stored in state. +// Components: Coder for a single element. +STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"]; Review comment: As per the discussion on the list, more general support for large values is desirable but not near as straightforward. I did go ahead and mark this coder as experimental. PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172229) Time Spent: 2h 40m (was: 2.5h) > Support retrieval of large gbk iterables over the state API. > > > Key: BEAM-6120 > URL: https://issues.apache.org/jira/browse/BEAM-6120 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner
[ https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172228 ] ASF GitHub Bot logged work on BEAM-6165: Author: ASF GitHub Bot Created on: 05/Dec/18 11:11 Start Date: 05/Dec/18 11:11 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #7183: [BEAM-6165] send metrics to Flink in portable Flink runner URL: https://github.com/apache/beam/pull/7183#discussion_r239023535 ## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java ## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.PrintStream; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.AbstractReporter; + +/** + * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics reporter} for writing + * metrics to a file (specified via the "metrics.reporter.test.file" config key). + */ +public class FileReporter extends AbstractReporter { Review comment: I suppose it could come in handy when you just want to dump metrics locally. Since package is `org.apache.beam.runners.flink.metrics`, `FileReporter` seems fine. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 172228) Time Spent: 5h 10m (was: 5h) > Send metrics to Flink in portable Flink runner > -- > > Key: BEAM-6165 > URL: https://issues.apache.org/jira/browse/BEAM-6165 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Ryan Williams >Assignee: Ryan Williams >Priority: Major > Labels: metrics, portability, portability-flink > Time Spent: 5h 10m > Remaining Estimate: 0h > > Metrics are sent from the fn harness to runner in the Python SDK (and likely > Java soon), but the portable Flink runner doesn't pass them on to Flink, > which it should, so that users can see them in e.g. the Flink UI or via any > Flink metrics reporters. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (BEAM-6172) Flink metrics are not generated in standard format
[ https://issues.apache.org/jira/browse/BEAM-6172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels reassigned BEAM-6172: Assignee: Maximilian Michels > Flink metrics are not generated in standard format > -- > > Key: BEAM-6172 > URL: https://issues.apache.org/jira/browse/BEAM-6172 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Affects Versions: 2.8.0 >Reporter: Micah Wylde >Assignee: Maximilian Michels >Priority: Minor > > The metrics that the flink runner exports do not follow the standard format > used by Flink, and doesn't respect Flink metric configuration options. > For example (with the default metrics configuration) beam produces a metric: > {code} > 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.group.0.__counter__group__org-apache-beam-runners-core-ReduceFnRunner__droppedDueToClosedWindow > {code} > whereas a native Flink metric looks like: > {code} > 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.Source-Custom-Source-7Kinesis-None-beam-env-docker-v1-0-ToKeyedWorkItem.0.numRecordsOut > {code} > In particular, Beam should respect the > [metric.scope.delimiter|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#metrics-scope-delimiter] > configuration for separating components of a metric (currently it uses > "__"), and should not include the type of metric (counter, gauge, etc.). -- This message was sent by Atlassian JIRA (v7.6.3#76005)