[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172553=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172553
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 06/Dec/18 01:30
Start Date: 06/Dec/18 01:30
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239301167
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
 ##
 @@ -423,6 +434,187 @@ public void close() {}
 contains(new 
CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
   }
 
+  @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
+  public void 
testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent()
+  throws Exception {
+final String stepName = "fakeStepNameWithUserMetrics";
+final String namespace = "sdk/whatever";
+final String name = "someCounter";
+final int counterValue = 42;
+final int finalCounterValue = 77;
+final CountDownLatch progressSentLatch = new CountDownLatch(1);
+final CountDownLatch processBundleLatch = new CountDownLatch(1);
+
+final BeamFnApi.Metrics.User.MetricName metricName =
+BeamFnApi.Metrics.User.MetricName.newBuilder()
+.setNamespace(namespace)
+.setName(name)
+.build();
+
+final BeamFnApi.Metrics deprecatedMetrics =
+BeamFnApi.Metrics.newBuilder()
+.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
+.putPtransforms(
+stepName,
+BeamFnApi.Metrics.PTransform.newBuilder()
+.addUser(
+BeamFnApi.Metrics.User.newBuilder()
+.setMetricName(metricName)
+.setCounterData(
+BeamFnApi.Metrics.User.CounterData.newBuilder()
+.setValue(finalCounterValue)))
+.build())
+.build();
+
+final int expectedCounterValue = 5;
+final BeamFnApi.MonitoringInfo expectedMonitoringInfo =
+BeamFnApi.MonitoringInfo.newBuilder()
+.setUrn("beam:metric:user:ExpectedCounter")
+.setType("beam:metrics:sum_int_64")
+.putLabels("PTRANSFORM", "ExpectedPTransform")
+.setMetric(
+BeamFnApi.Metric.newBuilder()
+.setCounterData(
+BeamFnApi.CounterData.newBuilder()
+.setInt64Value(expectedCounterValue)
+.build())
+.build())
+.build();
+
+InstructionRequestHandler instructionRequestHandler =
+new InstructionRequestHandler() {
+  @Override
+  public CompletionStage 
handle(InstructionRequest request) {
+switch (request.getRequestCase()) {
+  case REGISTER:
+return 
CompletableFuture.completedFuture(responseFor(request).build());
+  case PROCESS_BUNDLE:
+return MoreFutures.supplyAsync(
+() -> {
+  processBundleLatch.await();
+  return responseFor(request)
+  .setProcessBundle(
+  BeamFnApi.ProcessBundleResponse.newBuilder()
+  .setMetrics(deprecatedMetrics)
+  .addMonitoringInfos(expectedMonitoringInfo))
+  .build();
+});
+  case PROCESS_BUNDLE_PROGRESS:
+progressSentLatch.countDown();
+return CompletableFuture.completedFuture(
+responseFor(request)
+.setProcessBundleProgress(
+
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
+.setMetrics(deprecatedMetrics)
+.addMonitoringInfos(expectedMonitoringInfo))
+.build());
+  default:
+throw new RuntimeException("Reached unexpected code path");
+}
+  }
+
+  @Override
+  public void close() {}
+};
+
+Map stepContextMap = new HashMap<>();
+NameContext nc =
+new NameContext() {
+  @Nullable
+  @Override
+  public String stageName() {
+return "ExpectedStage";
+  }
+
+  @Nullable
+  @Override
+  public String originalName() {
+return 

[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172552=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172552
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 06/Dec/18 01:30
Start Date: 06/Dec/18 01:30
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239301128
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
 ##
 @@ -423,6 +434,187 @@ public void close() {}
 contains(new 
CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
   }
 
+  @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
+  public void 
testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent()
+  throws Exception {
+final String stepName = "fakeStepNameWithUserMetrics";
+final String namespace = "sdk/whatever";
+final String name = "someCounter";
+final int counterValue = 42;
+final int finalCounterValue = 77;
+final CountDownLatch progressSentLatch = new CountDownLatch(1);
+final CountDownLatch processBundleLatch = new CountDownLatch(1);
+
+final BeamFnApi.Metrics.User.MetricName metricName =
+BeamFnApi.Metrics.User.MetricName.newBuilder()
+.setNamespace(namespace)
+.setName(name)
+.build();
+
+final BeamFnApi.Metrics deprecatedMetrics =
+BeamFnApi.Metrics.newBuilder()
+.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
+.putPtransforms(
+stepName,
+BeamFnApi.Metrics.PTransform.newBuilder()
+.addUser(
+BeamFnApi.Metrics.User.newBuilder()
+.setMetricName(metricName)
+.setCounterData(
+BeamFnApi.Metrics.User.CounterData.newBuilder()
+.setValue(finalCounterValue)))
+.build())
+.build();
+
+final int expectedCounterValue = 5;
+final BeamFnApi.MonitoringInfo expectedMonitoringInfo =
+BeamFnApi.MonitoringInfo.newBuilder()
+.setUrn("beam:metric:user:ExpectedCounter")
+.setType("beam:metrics:sum_int_64")
+.putLabels("PTRANSFORM", "ExpectedPTransform")
+.setMetric(
+BeamFnApi.Metric.newBuilder()
+.setCounterData(
+BeamFnApi.CounterData.newBuilder()
+.setInt64Value(expectedCounterValue)
+.build())
+.build())
+.build();
+
+InstructionRequestHandler instructionRequestHandler =
+new InstructionRequestHandler() {
+  @Override
+  public CompletionStage 
handle(InstructionRequest request) {
+switch (request.getRequestCase()) {
+  case REGISTER:
+return 
CompletableFuture.completedFuture(responseFor(request).build());
+  case PROCESS_BUNDLE:
+return MoreFutures.supplyAsync(
+() -> {
+  processBundleLatch.await();
+  return responseFor(request)
+  .setProcessBundle(
+  BeamFnApi.ProcessBundleResponse.newBuilder()
+  .setMetrics(deprecatedMetrics)
+  .addMonitoringInfos(expectedMonitoringInfo))
+  .build();
+});
+  case PROCESS_BUNDLE_PROGRESS:
+progressSentLatch.countDown();
+return CompletableFuture.completedFuture(
+responseFor(request)
+.setProcessBundleProgress(
+
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
+.setMetrics(deprecatedMetrics)
+.addMonitoringInfos(expectedMonitoringInfo))
+.build());
+  default:
+throw new RuntimeException("Reached unexpected code path");
+}
+  }
+
+  @Override
+  public void close() {}
+};
+
+Map stepContextMap = new HashMap<>();
+NameContext nc =
+new NameContext() {
 
 Review comment:
   Do you need to override this and make a new class? Or is it possible to just 
instantiate an existing class and passing in these strings?


This is an 

[jira] [Work logged] (BEAM-3836) Java SDK harness should understand a BundleSplitRequest and respond with a BundleSplit before bundle finishes

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3836?focusedWorklogId=172543=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172543
 ]

ASF GitHub Bot logged work on BEAM-3836:


Author: ASF GitHub Bot
Created on: 06/Dec/18 00:45
Start Date: 06/Dec/18 00:45
Worklog Time Spent: 10m 
  Work Description: lukecwik commented on issue #7200: [BEAM-3836] Add 
support for checkpointing to the Java SDK harness. This is towards a general 
splitting solution over the Fn API.
URL: https://github.com/apache/beam/pull/7200#issuecomment-444706319
 
 
   R: @robertwb @iemejia @swegner 
   CC: @boyuanzz 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172543)
Time Spent: 10m
Remaining Estimate: 0h

> Java SDK harness should understand a BundleSplitRequest and respond with a 
> BundleSplit before bundle finishes
> -
>
> Key: BEAM-3836
> URL: https://issues.apache.org/jira/browse/BEAM-3836
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-harness
>Reporter: Eugene Kirpichov
>Assignee: Luke Cwik
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This, like BEAM-3835, is more necessary for LS than for checkpointing, but is 
> technically also part of the SDF splitting protocol.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-3836) Java SDK harness should understand a BundleSplitRequest and respond with a BundleSplit before bundle finishes

2018-12-05 Thread Luke Cwik (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik reassigned BEAM-3836:
---

Assignee: Luke Cwik  (was: Eugene Kirpichov)

> Java SDK harness should understand a BundleSplitRequest and respond with a 
> BundleSplit before bundle finishes
> -
>
> Key: BEAM-3836
> URL: https://issues.apache.org/jira/browse/BEAM-3836
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-harness
>Reporter: Eugene Kirpichov
>Assignee: Luke Cwik
>Priority: Major
>
> This, like BEAM-3835, is more necessary for LS than for checkpointing, but is 
> technically also part of the SDF splitting protocol.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5321) Finish Python 3 porting for transforms module

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5321?focusedWorklogId=172538=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172538
 ]

ASF GitHub Bot logged work on BEAM-5321:


Author: ASF GitHub Bot
Created on: 06/Dec/18 00:12
Start Date: 06/Dec/18 00:12
Worklog Time Spent: 10m 
  Work Description: RobbeSneyders commented on a change in pull request 
#7104: [BEAM-5321] Port transforms package to Python 3
URL: https://github.com/apache/beam/pull/7104#discussion_r239287015
 
 

 ##
 File path: sdks/python/setup.py
 ##
 @@ -102,16 +102,17 @@ def get_version():
 cythonize = lambda *args, **kwargs: []
 
 REQUIRED_PACKAGES_PY2_ONLY = [
-'avro>=1.8.1,<2.0.0'
+'avro>=1.8.1,<2.0.0',
+'dill>=0.2.6,<=0.2.8.2',
 ]
 
 REQUIRED_PACKAGES_PY3_ONLY = [
-'avro-python3>=1.8.1,<2.0.0'
+'avro-python3>=1.8.1,<2.0.0',
 
 Review comment:
   This doesn't seem to work for the dill requirement. We can clean this up 
when we clean up the dill dependency.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172538)
Time Spent: 2.5h  (was: 2h 20m)

> Finish Python 3 porting for transforms module
> -
>
> Key: BEAM-5321
> URL: https://issues.apache.org/jira/browse/BEAM-5321
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Robbe
>Assignee: Robbe
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172530=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172530
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 06/Dec/18 00:03
Start Date: 06/Dec/18 00:03
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239285290
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -307,10 +344,17 @@ void updateProgress() {
   grpcWriteOperation.abortWait();
 }
 
-BeamFnApi.Metrics metrics = 
MoreFutures.get(bundleProcessOperation.getMetrics());
+//TODO: Replace getProcessBundleProgress with getMonitoringInfos when 
Metrics is deprecated.
 
 Review comment:
   I was thinking on the proper way here and assumed that todo that explains 
when the code gets deprecated was the best approach.
   1. Creating JIRA that looks like "Cleanup Dataflow Runner Harness" when 
Metrics are deprecated. Doesn't cover proper workload. So it is not the best 
idea in my opinion.
   2. Assigning username is not beneficial as well, since we do not know when 
will we completely deprecate Metrics. So it doesn't really differ from regular 
todo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172530)
Time Spent: 3h 20m  (was: 3h 10m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172529=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172529
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 06/Dec/18 00:02
Start Date: 06/Dec/18 00:02
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239285290
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -307,10 +344,17 @@ void updateProgress() {
   grpcWriteOperation.abortWait();
 }
 
-BeamFnApi.Metrics metrics = 
MoreFutures.get(bundleProcessOperation.getMetrics());
+//TODO: Replace getProcessBundleProgress with getMonitoringInfos when 
Metrics is deprecated.
 
 Review comment:
   I was thinking on the proper way here and assumed that todo that clears up 
when the code gets deprecated was the best approach.
   1. Creating JIRA that looks like "Cleanup Dataflow Runner Harness" when 
Metrics are deprecated. Doesn't cover proper workload. So it is not the best 
idea in my opinion.
   2. Assigning username is not beneficial as well, since we do not know when 
will we completely deprecate Metrics. So it doesn't really differ from regular 
todo.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172529)
Time Spent: 3h 10m  (was: 3h)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5321) Finish Python 3 porting for transforms module

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5321?focusedWorklogId=172511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172511
 ]

ASF GitHub Bot logged work on BEAM-5321:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:54
Start Date: 05/Dec/18 22:54
Worklog Time Spent: 10m 
  Work Description: RobbeSneyders commented on issue #7104: [BEAM-5321] 
Port transforms package to Python 3
URL: https://github.com/apache/beam/pull/7104#issuecomment-444682962
 
 
   Seems like adding `--process-dependency-links` to the tox `install_command` 
worked. Although I don't know why this is only necessary for Jenkins.
   
https://github.com/apache/beam/blob/01b75994709b1964f2c555d542b19ecf20f223ec/sdks/python/tox.ini#L44


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172511)
Time Spent: 2h 20m  (was: 2h 10m)

> Finish Python 3 porting for transforms module
> -
>
> Key: BEAM-5321
> URL: https://issues.apache.org/jira/browse/BEAM-5321
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Robbe
>Assignee: Robbe
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172510=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172510
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:52
Start Date: 05/Dec/18 22:52
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239268924
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +398,106 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// Keeping this as static class for this iteration. Will extract to 
separate file and generalize
+// when more counter types are added.
+// todomigryz: define counter transformer factory
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+  if (!type.equals("beam:metrics:sum_int_64")) {
+return null;
+  }
+
+  final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+  if (ptransform == null) {
+return null;
+  }
+
+  DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+
+  CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+  String nameWithNamespace =
+  monitoringInfo
+  .getUrn()
+  .substring(BEAM_METRICS_USER_PREFIX.length())
+  .replace("^:", "");
+
+  final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
+  String counterName = nameWithNamespace.substring(lastColonIndex + 1);
+  String counterNamespace = nameWithNamespace.substring(0, 
lastColonIndex);
+
+  name.setName(
+  new CounterStructuredName()
+  .setOrigin(Origin.USER.toString())
+  // Workaround for bug in python sdk that missed colon 
after ...metric:user.
+  .setName(counterName)
+  .setOriginalStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().originalName())
+  .setExecutionStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().systemName())
+  .setOriginNamespace(counterNamespace))
+  .setMetadata(new CounterMetadata().setKind("SUM"));
+
+  return new CounterUpdate()
 
 Review comment:
   That is a valid point. But I'd prefer to implement this in next PR when I 
add other type of counters.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172510)
Time Spent: 3h  (was: 2h 50m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>

[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172502=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172502
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r238894508
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +395,104 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// todomigryz: define counter transformer factory (read wiki for proper 
name)
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+  if (!type.equals("beam:metrics:sum_int_64")) {
+return null;
+  }
+
+  final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+  if (ptransform == null) {
+return null;
+  }
+
+  DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+
+  CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+  String nameWithNamespace =
+  monitoringInfo
+  .getUrn()
+  .substring(BEAM_METRICS_USER_PREFIX.length())
+  .replace("^:", "");
+
+  final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
+  String counterName = nameWithNamespace.substring(lastColonIndex + 1);
+  String counterNamespace = nameWithNamespace.substring(0, 
lastColonIndex);
+
+  name.setName(
+  new CounterStructuredName()
+  .setOrigin(Origin.USER.toString())
+  // Workaround for bug in python sdk that missed colon 
after ...metric:user.
+  .setName(counterName)
+  .setOriginalStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().originalName())
 
 Review comment:
   I don't think we should ever set this to null, if we are unable to mape to a 
stepContext we should drop the metric and log error/warning


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172502)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172491=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172491
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r238894238
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +395,104 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// todomigryz: define counter transformer factory (read wiki for proper 
name)
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
 
 Review comment:
   Architectural suggestion would be to use polymorphism instead of a switch 
statement, 
   
   Make a class/object responsible for each URN.
   
   Though I think much of it can be common, like label extraction/conversion. 
Let's come up with something that will support some default behaviour and allow 
you to override it with specific URN needs
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172491)
Time Spent: 1.5h  (was: 1h 20m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172498=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172498
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239254784
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +398,106 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// Keeping this as static class for this iteration. Will extract to 
separate file and generalize
+// when more counter types are added.
+// todomigryz: define counter transformer factory
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+  if (!type.equals("beam:metrics:sum_int_64")) {
+return null;
+  }
+
+  final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+  if (ptransform == null) {
+return null;
+  }
+
+  DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+
+  CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+  String nameWithNamespace =
+  monitoringInfo
+  .getUrn()
+  .substring(BEAM_METRICS_USER_PREFIX.length())
+  .replace("^:", "");
+
+  final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
+  String counterName = nameWithNamespace.substring(lastColonIndex + 1);
+  String counterNamespace = nameWithNamespace.substring(0, 
lastColonIndex);
+
+  name.setName(
+  new CounterStructuredName()
+  .setOrigin(Origin.USER.toString())
+  // Workaround for bug in python sdk that missed colon 
after ...metric:user.
+  .setName(counterName)
+  .setOriginalStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().originalName())
+  .setExecutionStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().systemName())
+  .setOriginNamespace(counterNamespace))
+  .setMetadata(new CounterMetadata().setKind("SUM"));
+
+  return new CounterUpdate()
+  .setStructuredNameAndMetadata(name)
+  .setCumulative(false)
+  
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value));
+}
+return null;
+  }
+}
+
+private void updateMetrics(List monitoringInfos) {
 
 Review comment:
   please add docstrings to all of these. There are many methods with too 
similar names.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172498)
Time Spent: 2h 20m  (was: 2h 10m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
>  

[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172496=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172496
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239247446
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -307,10 +347,17 @@ void updateProgress() {
   grpcWriteOperation.abortWait();
 }
 
-BeamFnApi.Metrics metrics = 
MoreFutures.get(bundleProcessOperation.getMetrics());
+//TODO: Replace getProcessBundleProgress with getMonitoringInfos when 
Metrics is deprecated.
+ProcessBundleProgressResponse processBundleProgressResponse =
+MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
+updateMetrics(processBundleProgressResponse.getMonitoringInfosList());
 
-updateMetrics(metrics);
+// Supporting deprecated metrics until all supported runners are 
migrated to using
+// MonitoringInfos
+Metrics metrics = processBundleProgressResponse.getMetrics();
+updateMetricsDeprecated(metrics);
 
+// TODO: change this to utilize MonitroingInfos
 
 Review comment:
   please add username and/or jira number


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172496)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172493=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172493
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239240671
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
 ##
 @@ -263,20 +263,35 @@ private synchronized WorkItemStatus 
createStatusUpdate(boolean isFinal) {
 return status;
   }
 
+  // todo this method should return List instead of setting it 
to WorkitemStatus
 
 Review comment:
   Please add a username and/or jira


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172493)
Time Spent: 1h 50m  (was: 1h 40m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172504=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172504
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:07
Start Date: 05/Dec/18 22:07
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239256276
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +395,104 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// todomigryz: define counter transformer factory (read wiki for proper 
name)
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
 
 Review comment:
   Agree with this one.
   I decided to keep it this way for now. Will implement polymorphism when 
adding support for another counter type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172504)
Time Spent: 2h 50m  (was: 2h 40m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172499=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172499
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239253828
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +398,106 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// Keeping this as static class for this iteration. Will extract to 
separate file and generalize
+// when more counter types are added.
+// todomigryz: define counter transformer factory
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+  if (!type.equals("beam:metrics:sum_int_64")) {
+return null;
+  }
+
+  final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+  if (ptransform == null) {
+return null;
+  }
+
+  DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+
+  CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+  String nameWithNamespace =
 
 Review comment:
   Please put this in a helper method for readability, make a method to parse 
the string


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172499)
Time Spent: 2h 20m  (was: 2h 10m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172490=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172490
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r238893122
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -307,10 +344,17 @@ void updateProgress() {
   grpcWriteOperation.abortWait();
 }
 
-BeamFnApi.Metrics metrics = 
MoreFutures.get(bundleProcessOperation.getMetrics());
+//TODO: Replace getProcessBundleProgress with getMonitoringInfos when 
Metrics is deprecated.
 
 Review comment:
   please add username and/or jira number


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172490)
Time Spent: 1h 20m  (was: 1h 10m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172494=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172494
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239241898
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java
 ##
 @@ -263,20 +263,35 @@ private synchronized WorkItemStatus 
createStatusUpdate(boolean isFinal) {
 return status;
   }
 
+  // todo this method should return List instead of setting it 
to WorkitemStatus
   @VisibleForTesting
   synchronized void populateCounterUpdates(WorkItemStatus status) {
 if (worker == null) {
   return;
 }
 
 boolean isFinalUpdate = Boolean.TRUE.equals(status.getCompleted());
-ImmutableList.Builder counterUpdatesBuilder = 
ImmutableList.builder();
-counterUpdatesBuilder.addAll(extractCounters(worker.getOutputCounters()));
-counterUpdatesBuilder.addAll(extractMetrics(isFinalUpdate));
-counterUpdatesBuilder.addAll(extractMsecCounters(isFinalUpdate));
-counterUpdatesBuilder.addAll(worker.extractMetricUpdates());
 
-ImmutableList counterUpdates = 
counterUpdatesBuilder.build();
+ImmutableList.Builder counterUpdatesListBuilder = 
ImmutableList.builder();
+Iterable newCounterUpdates;
+
+// Output counters
+newCounterUpdates = extractCounters(worker.getOutputCounters());
+counterUpdatesListBuilder.addAll(newCounterUpdates);
+
+// User metrics reported in Worker
+newCounterUpdates = extractMetrics(isFinalUpdate);
 
 Review comment:
   Please revert to the previous style, there isn't much value to expanding 
these to two lines each.
   
   thanks for the code comments though


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172494)
Time Spent: 2h  (was: 1h 50m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172503=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172503
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239245981
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -307,10 +347,17 @@ void updateProgress() {
   grpcWriteOperation.abortWait();
 }
 
-BeamFnApi.Metrics metrics = 
MoreFutures.get(bundleProcessOperation.getMetrics());
+//TODO: Replace getProcessBundleProgress with getMonitoringInfos when 
Metrics is deprecated.
+ProcessBundleProgressResponse processBundleProgressResponse =
+MoreFutures.get(bundleProcessOperation.getProcessBundleProgress());
+updateMetrics(processBundleProgressResponse.getMonitoringInfosList());
 
-updateMetrics(metrics);
+// Supporting deprecated metrics until all supported runners are 
migrated to using
+// MonitoringInfos
+Metrics metrics = processBundleProgressResponse.getMetrics();
+updateMetricsDeprecated(metrics);
 
+// TODO: change this to utilize MonitroingInfos
 
 Review comment:
   sp: MonitroingInfos


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172503)
Time Spent: 2h 40m  (was: 2.5h)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172497=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172497
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239252005
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +398,106 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// Keeping this as static class for this iteration. Will extract to 
separate file and generalize
+// when more counter types are added.
+// todomigryz: define counter transformer factory
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
 
 Review comment:
   Add username and/or jira number


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172497)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172489=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172489
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r238892994
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -177,21 +199,24 @@ public ReadOperation getReadOperation() throws Exception 
{
 throw new IllegalStateException(String.format("ReadOperation not found in 
%s", operations));
   }
 
-  private static interface ProgressTracker {
+  private interface ProgressTracker {
 @Nullable
-public NativeReader.Progress getWorkerProgress() throws Exception;
+public Progress getWorkerProgress() throws Exception;
 
 /**
  * Returns an metric updates accumulated since the last call to {@link 
#extractMetricUpdates()}.
  */
+@Deprecated
 public MetricUpdates extractMetricUpdates();
 
+public List extractCounterUpdates();
 
 Review comment:
   please add a comment describing what these are


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172489)
Time Spent: 1h 10m  (was: 1h)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172492=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172492
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239254570
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +398,106 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// Keeping this as static class for this iteration. Will extract to 
separate file and generalize
+// when more counter types are added.
+// todomigryz: define counter transformer factory
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+  if (!type.equals("beam:metrics:sum_int_64")) {
+return null;
+  }
+
+  final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+  if (ptransform == null) {
+return null;
+  }
+
+  DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+
+  CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+  String nameWithNamespace =
+  monitoringInfo
+  .getUrn()
+  .substring(BEAM_METRICS_USER_PREFIX.length())
+  .replace("^:", "");
+
+  final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
+  String counterName = nameWithNamespace.substring(lastColonIndex + 1);
+  String counterNamespace = nameWithNamespace.substring(0, 
lastColonIndex);
+
+  name.setName(
+  new CounterStructuredName()
+  .setOrigin(Origin.USER.toString())
+  // Workaround for bug in python sdk that missed colon 
after ...metric:user.
+  .setName(counterName)
+  .setOriginalStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().originalName())
+  .setExecutionStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().systemName())
+  .setOriginNamespace(counterNamespace))
+  .setMetadata(new CounterMetadata().setKind("SUM"));
+
+  return new CounterUpdate()
 
 Review comment:
   This is okay for now, since you are just unpacking the user counter metric. 
but I want to see this share more code for different metrics later on.
   
   Any label extraction can be shared for example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172492)
Time Spent: 1h 40m  (was: 1.5h)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution

[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172501=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172501
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239254686
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +398,106 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// Keeping this as static class for this iteration. Will extract to 
separate file and generalize
+// when more counter types are added.
+// todomigryz: define counter transformer factory
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+  if (!type.equals("beam:metrics:sum_int_64")) {
+return null;
+  }
+
+  final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+  if (ptransform == null) {
+return null;
+  }
+
+  DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+
+  CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+  String nameWithNamespace =
+  monitoringInfo
+  .getUrn()
+  .substring(BEAM_METRICS_USER_PREFIX.length())
+  .replace("^:", "");
+
+  final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
+  String counterName = nameWithNamespace.substring(lastColonIndex + 1);
+  String counterNamespace = nameWithNamespace.substring(0, 
lastColonIndex);
+
+  name.setName(
+  new CounterStructuredName()
+  .setOrigin(Origin.USER.toString())
+  // Workaround for bug in python sdk that missed colon 
after ...metric:user.
+  .setName(counterName)
+  .setOriginalStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().originalName())
+  .setExecutionStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().systemName())
+  .setOriginNamespace(counterNamespace))
+  .setMetadata(new CounterMetadata().setKind("SUM"));
+
+  return new CounterUpdate()
+  .setStructuredNameAndMetadata(name)
+  .setCumulative(false)
+  
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value));
+}
+return null;
+  }
+}
+
+private void updateMetrics(List monitoringInfos) {
+  final MonitoringInfoToCounterUpdateTransformer 
monitoringInfoToCounterUpdateTransformer =
+  new MonitoringInfoToCounterUpdateTransformer(
+  bundleProcessOperation.getPtransformIdToUserStepContext());
+  String miDump =
 
 Review comment:
   rm debug code


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172501)
Time Spent: 2.5h  (was: 2h 20m)

> Utilize MetricInfo for 

[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172495=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172495
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239254849
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutor.java
 ##
 @@ -351,7 +398,106 @@ void updateProgress() {
   }
 }
 
-private void updateMetrics(BeamFnApi.Metrics metrics) {
+// Keeping this as static class for this iteration. Will extract to 
separate file and generalize
+// when more counter types are added.
+// todomigryz: define counter transformer factory
+// that can provide respective counter transformer for different type of 
counters.
+// (ie RowCountCounterTranformer, MSecCounterTransformer, 
UserCounterTransformer, etc)
+private static class MonitoringInfoToCounterUpdateTransformer {
+
+  private final Map transformIdMapping;
+
+  public MonitoringInfoToCounterUpdateTransformer(
+  final Map transformIdMapping) {
+this.transformIdMapping = transformIdMapping;
+  }
+
+  // todo: search code for "beam:metrics"... and replace them with 
relevant enums from
+  // proto after rebasing above https://github.com/apache/beam/pull/6799 
that
+  // introduces relevant proto entries.
+  final String BEAM_METRICS_USER_PREFIX = "beam:metric:user";
+
+  private CounterUpdate monitoringInfoToCounterUpdate(MonitoringInfo 
monitoringInfo) {
+long value = 
monitoringInfo.getMetric().getCounterData().getInt64Value();
+String urn = monitoringInfo.getUrn();
+
+String type = monitoringInfo.getType();
+
+// todo: run MonitoringInfo through validation process.
+// refer to https://github.com/apache/beam/pull/6799
+
+if (urn.startsWith(BEAM_METRICS_USER_PREFIX)) {
+  if (!type.equals("beam:metrics:sum_int_64")) {
+return null;
+  }
+
+  final String ptransform = 
monitoringInfo.getLabelsMap().get("PTRANSFORM");
+  if (ptransform == null) {
+return null;
+  }
+
+  DataflowStepContext stepContext = transformIdMapping.get(ptransform);
+
+  CounterStructuredNameAndMetadata name = new 
CounterStructuredNameAndMetadata();
+
+  String nameWithNamespace =
+  monitoringInfo
+  .getUrn()
+  .substring(BEAM_METRICS_USER_PREFIX.length())
+  .replace("^:", "");
+
+  final int lastColonIndex = nameWithNamespace.lastIndexOf(':');
+  String counterName = nameWithNamespace.substring(lastColonIndex + 1);
+  String counterNamespace = nameWithNamespace.substring(0, 
lastColonIndex);
+
+  name.setName(
+  new CounterStructuredName()
+  .setOrigin(Origin.USER.toString())
+  // Workaround for bug in python sdk that missed colon 
after ...metric:user.
+  .setName(counterName)
+  .setOriginalStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().originalName())
+  .setExecutionStepName(
+  stepContext == null ? null : 
stepContext.getNameContext().systemName())
+  .setOriginNamespace(counterNamespace))
+  .setMetadata(new CounterMetadata().setKind("SUM"));
+
+  return new CounterUpdate()
+  .setStructuredNameAndMetadata(name)
+  .setCumulative(false)
+  
.setInteger(DataflowCounterUpdateExtractor.longToSplitInt(value));
+}
+return null;
+  }
+}
+
+private void updateMetrics(List monitoringInfos) {
+  final MonitoringInfoToCounterUpdateTransformer 
monitoringInfoToCounterUpdateTransformer =
+  new MonitoringInfoToCounterUpdateTransformer(
+  bundleProcessOperation.getPtransformIdToUserStepContext());
+  String miDump =
+  
monitoringInfos.stream().map(Objects::toString).collect(Collectors.joining("\n"));
+
+  counterUpdates =
+  monitoringInfos
+  .stream()
+  
.map(monitoringInfoToCounterUpdateTransformer::monitoringInfoToCounterUpdate)
+  .filter(Objects::nonNull)
+  .collect(Collectors.toList());
+  return;
+}
+
+private String getStackTrace() {
 
 Review comment:
   rm debug code


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172500=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172500
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:04
Start Date: 05/Dec/18 22:04
Worklog Time Spent: 10m 
  Work Description: ajamato commented on a change in pull request #7202: 
[BEAM-6181] Reporting user counters via MonitoringInfos in Portable Dataflow 
Runner.
URL: https://github.com/apache/beam/pull/7202#discussion_r239255236
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/BeamFnMapTaskExecutorTest.java
 ##
 @@ -423,6 +423,131 @@ public void close() {}
 contains(new 
CounterHamcrestMatchers.CounterUpdateIntegerValueMatcher(finalCounterValue)));
   }
 
+  @Test(timeout = ReadOperation.DEFAULT_PROGRESS_UPDATE_PERIOD_MS * 10)
+  public void 
testExtractCounterUpdatesReturnsValidProgressTrackerCounterUpdatesIfPresent()
+  throws Exception {
+final String stepName = "fakeStepNameWithUserMetrics";
+final String namespace = "sdk/whatever";
+final String name = "someCounter";
+final int counterValue = 42;
+final int finalCounterValue = 77;
+final CountDownLatch progressSentLatch = new CountDownLatch(1);
+final CountDownLatch processBundleLatch = new CountDownLatch(1);
+
+final BeamFnApi.Metrics.User.MetricName metricName =
+BeamFnApi.Metrics.User.MetricName.newBuilder()
+.setNamespace(namespace)
+.setName(name)
+.build();
+
+final BeamFnApi.Metrics deprecatedMetrics =
+BeamFnApi.Metrics.newBuilder()
+.putPtransforms(GRPC_READ_ID, FAKE_ELEMENT_COUNT_METRICS)
+.putPtransforms(
+stepName,
+BeamFnApi.Metrics.PTransform.newBuilder()
+.addUser(
+BeamFnApi.Metrics.User.newBuilder()
+.setMetricName(metricName)
+.setCounterData(
+BeamFnApi.Metrics.User.CounterData.newBuilder()
+.setValue(finalCounterValue)))
+.build())
+.build();
+
+final int expectedCounterValue = 5;
+final BeamFnApi.MonitoringInfo expectedMonitoringInfo =
+BeamFnApi.MonitoringInfo.newBuilder()
+.setUrn("beam:metric:user:ExpectedCounter")
+.setType("beam:metrics:sum_int_64")
+.putLabels("PTRANSFORM", "ExpectedPTransform")
+.setMetric(
+BeamFnApi.Metric.newBuilder()
+.setCounterData(
+BeamFnApi.CounterData.newBuilder()
+.setInt64Value(expectedCounterValue)
+.build())
+.build())
+.build();
+
+InstructionRequestHandler instructionRequestHandler =
+new InstructionRequestHandler() {
+  @Override
+  public CompletionStage 
handle(InstructionRequest request) {
+switch (request.getRequestCase()) {
+  case REGISTER:
+return 
CompletableFuture.completedFuture(responseFor(request).build());
+  case PROCESS_BUNDLE:
+return MoreFutures.supplyAsync(
+() -> {
+  processBundleLatch.await();
+  return responseFor(request)
+  .setProcessBundle(
+  BeamFnApi.ProcessBundleResponse.newBuilder()
+  .setMetrics(deprecatedMetrics)
+  .addMonitoringInfos(expectedMonitoringInfo))
+  .build();
+});
+  case PROCESS_BUNDLE_PROGRESS:
+progressSentLatch.countDown();
+return CompletableFuture.completedFuture(
+responseFor(request)
+.setProcessBundleProgress(
+
BeamFnApi.ProcessBundleProgressResponse.newBuilder()
+.setMetrics(deprecatedMetrics)
+.addMonitoringInfos(expectedMonitoringInfo))
+.build());
+  default:
+throw new RuntimeException("Reached unexpected code path");
+}
+  }
+
+  @Override
+  public void close() {}
+};
+
+RegisterAndProcessBundleOperation processOperation =
+new RegisterAndProcessBundleOperation(
+IdGenerators.decrementingLongs(),
+instructionRequestHandler,
+mockBeamFnStateDelegator,
+REGISTER_REQUEST,
+ImmutableMap.of(),
+ImmutableMap.of(),
+   

[jira] [Work logged] (BEAM-5321) Finish Python 3 porting for transforms module

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5321?focusedWorklogId=172488=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172488
 ]

ASF GitHub Bot logged work on BEAM-5321:


Author: ASF GitHub Bot
Created on: 05/Dec/18 22:00
Start Date: 05/Dec/18 22:00
Worklog Time Spent: 10m 
  Work Description: RobbeSneyders commented on issue #7104: [BEAM-5321] 
Port transforms package to Python 3
URL: https://github.com/apache/beam/pull/7104#issuecomment-444667493
 
 
   Hi @tvalentyn, sorry for the wait. 
   The solution implemented in this PR to install dill from a github command 
works on my local machine, but fails on Jenkins. It seems like the dependencies 
in setup.py are not processed correctly.
   I just tried to run this on a VM, which also worked.
   Any idea on why this might fail on Jenkins?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172488)
Time Spent: 2h 10m  (was: 2h)

> Finish Python 3 porting for transforms module
> -
>
> Key: BEAM-5321
> URL: https://issues.apache.org/jira/browse/BEAM-5321
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-py-core
>Reporter: Robbe
>Assignee: Robbe
>Priority: Major
> Fix For: Not applicable
>
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5993) Create SideInput Load test

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5993?focusedWorklogId=172475=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172475
 ]

ASF GitHub Bot logged work on BEAM-5993:


Author: ASF GitHub Bot
Created on: 05/Dec/18 20:59
Start Date: 05/Dec/18 20:59
Worklog Time Spent: 10m 
  Work Description: pabloem commented on a change in pull request #7020: 
BEAM-5993 Create SideInput Load test
URL: https://github.com/apache/beam/pull/7020#discussion_r239234878
 
 

 ##
 File path: sdks/python/apache_beam/testing/load_tests/sideinput_test.py
 ##
 @@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+To run test on DirectRunner
+
+python setup.py nosetests \
+--test-pipeline-options="
+--number_of_counter_operations=1000
+--input_options='{
+\"num_records\": 300,
+\"key_size\": 5,
+\"value_size\":15,
+\"bundle_size_distribution_type\": \"const\",
+\"bundle_size_distribution_param\": 1,
+\"force_initial_num_bundles\": 0
+}'
+   " \
+--tests apache_beam.testing.load_tests.sideinput_test
+
+To run test on other runner (ex. Dataflow):
+
+python setup.py nosetests \
+--test-pipeline-options="
+--runner=TestDataflowRunner
+--project=...
+--staging_location=gs://...
+--temp_location=gs://...
+--sdk_location=./dist/apache-beam-x.x.x.dev0.tar.gz
+--number_of_counter_operations=1000
+--input_options='{
+\"num_records\": 1,
+\"key_size\": 1,
+\"value_size\":1,
+\"bundle_size_distribution_type\": \"const\",
+\"bundle_size_distribution_param\": 1,
+\"force_initial_num_bundles\": 0
+}'
+" \
+--tests apache_beam.testing.load_tests.sideinput_test
+
+"""
+
+from __future__ import absolute_import
+
+import json
+import logging
+import unittest
+
+import apache_beam as beam
+from apache_beam.pvalue import AsIter
+from apache_beam.testing import synthetic_pipeline
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class SideInputTest(unittest.TestCase):
+  def _parseTestPipelineOptions(self):
+return {
+'numRecords': self.inputOptions.get('num_records'),
+'keySizeBytes': self.inputOptions.get('key_size'),
+'valueSizeBytes': self.inputOptions.get('value_size'),
+'bundleSizeDistribution': {
+'type': self.inputOptions.get(
+'bundle_size_distribution_type', 'const'
+),
+'param': self.inputOptions.get('bundle_size_distribution_param', 0)
+},
+'forceNumInitialBundles': self.inputOptions.get(
+'force_initial_num_bundles', 0
+)
+}
+
+  def _getSideInput(self):
+side_input = self._parseTestPipelineOptions()
+side_input['numRecords'] = side_input['numRecords']
+side_input['keySizeBytes'] = side_input['keySizeBytes']
+side_input['valueSizeBytes'] = side_input['valueSizeBytes']
+return side_input
+
+  def _getPerElementDelaySec(self):
+return self.syntheticStepOptions.get('per_element_delay_sec', 0)
+
+  def _getPerBundleDelaySec(self):
+return self.syntheticStepOptions.get('per_bundle_delay_sec', 0)
+
+  def _getOutputRecordsPerInputRecords(self):
+return self.syntheticStepOptions.get('output_records_per_input_records', 0)
+
+  def setUp(self):
+self.pipeline = TestPipeline()
+self.inputOptions = json.loads(self.pipeline.get_option('input_options'))
+self.iterations = self.pipeline.get_option('number_of_counter_operations')
+if self.iterations is None:
+  self.iterations = 1
+
+  class Consume(beam.DoFn):
+def process(self, element, *args, **kwargs):
+  values_list = []
+  for i in element:
+for _, value in i.iteritems():
+  values_list.append(value)
+  yield values_list
+
+  def testSideInput(self):
+def join_fn(element, side_input, iterations):
+  list = []
+  for i in range(iterations):
+for key, value in side_input:
+  if i == iterations - 1:
+   

[jira] [Work logged] (BEAM-6184) PortableRunner dependency missed in wordcount example maven artifact

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6184?focusedWorklogId=172473=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172473
 ]

ASF GitHub Bot logged work on BEAM-6184:


Author: ASF GitHub Bot
Created on: 05/Dec/18 20:34
Start Date: 05/Dec/18 20:34
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #7213: [BEAM-6184] Add 
portable-runner dependency to example pom.xml
URL: https://github.com/apache/beam/pull/7213#issuecomment-444636635
 
 
   R: @youngoli 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172473)
Time Spent: 20m  (was: 10m)

> PortableRunner dependency missed in wordcount example maven artifact
> 
>
> Key: BEAM-6184
> URL: https://issues.apache.org/jira/browse/BEAM-6184
> Project: Beam
>  Issue Type: Improvement
>  Components: build-system
>Reporter: Ruoyun Huang
>Assignee: Ruoyun Huang
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
>  
>  
> more context: 
> https://lists.apache.org/thread.html/8dd60395424425f7502d62888c49014430d1d3b06c026606f3db28ab@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172466=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172466
 ]

ASF GitHub Bot logged work on BEAM-6159:


Author: ASF GitHub Bot
Created on: 05/Dec/18 20:15
Start Date: 05/Dec/18 20:15
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #7015: [BEAM-6159] Migrate 
dataflow portable worker using shared library to process bundle 
URL: https://github.com/apache/beam/pull/7015#issuecomment-444630350
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172466)
Time Spent: 1h  (was: 50m)

> Dataflow portable runner harness should use ExecutableStage to process bundle
> -
>
> Key: BEAM-6159
> URL: https://issues.apache.org/jira/browse/BEAM-6159
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-6184) PortableRunner dependency missed in wordcount example maven artifact

2018-12-05 Thread Ruoyun Huang (JIRA)
Ruoyun Huang created BEAM-6184:
--

 Summary: PortableRunner dependency missed in wordcount example 
maven artifact
 Key: BEAM-6184
 URL: https://issues.apache.org/jira/browse/BEAM-6184
 Project: Beam
  Issue Type: Improvement
  Components: build-system
Reporter: Ruoyun Huang
Assignee: Ruoyun Huang


 

 

more context: 
https://lists.apache.org/thread.html/8dd60395424425f7502d62888c49014430d1d3b06c026606f3db28ab@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6184) PortableRunner dependency missed in wordcount example maven artifact

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6184?focusedWorklogId=172463=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172463
 ]

ASF GitHub Bot logged work on BEAM-6184:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:59
Start Date: 05/Dec/18 19:59
Worklog Time Spent: 10m 
  Work Description: HuangLED opened a new pull request #7213: [BEAM-6184] 
Add portable-runner dependency to example pom.xml
URL: https://github.com/apache/beam/pull/7213
 
 
   
   Add portable-runner to example pom.xml's default dependency list. 
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172463)
Time Spent: 10m
Remaining Estimate: 0h

> PortableRunner dependency missed in wordcount example maven artifact
> 

[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172462=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172462
 ]

ASF GitHub Bot logged work on BEAM-6159:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:40
Start Date: 05/Dec/18 19:40
Worklog Time Spent: 10m 
  Work Description: boyuanzz edited a comment on issue #7015: [BEAM-6159] 
Migrate dataflow portable worker using shared library to process bundle 
URL: https://github.com/apache/beam/pull/7015#issuecomment-444614614
 
 
   The squashed commit: 
https://github.com/apache/beam/pull/7015/commits/5c78520ed98576de76ff8d2ddc24c67ab70a6d21
 addressed most comments above. There are several issues left which will be in 
the following up PRs:
   1. Map output PCollection to following OutputReceivers.
   2. Setup the gradle test task to track the new bundle processing with 
ExecutableStage.
   3. Turn on the experiment path for streaming worker.
   
   Please take another look @lukecwik 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172462)
Time Spent: 50m  (was: 40m)

> Dataflow portable runner harness should use ExecutableStage to process bundle
> -
>
> Key: BEAM-6159
> URL: https://issues.apache.org/jira/browse/BEAM-6159
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172460=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172460
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:36
Start Date: 05/Dec/18 19:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj closed pull request #7210: [BEAM-6182] 
Cherry-pick: Disable conscrypt by default (#7203)
URL: https://github.com/apache/beam/pull/7210
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
index aeeec3318206..13df96c85e88 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
@@ -57,16 +57,20 @@ public static DataflowWorkerHarnessOptions 
initializeGlobalStateAndPipelineOptio
 
 ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions);
 
-if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) {
+String experimentName = 
Experiment.EnableConscryptSecurityProvider.getName();
+if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) {
   /* Enable fast SSL provider. */
   LOG.info(
-  "Dataflow runner uses conscrypt by default for SSL. To disable this 
feature, "
-  + "pass pipeline option 
--experiment=disable_conscrypt_security_provider");
+  "Dataflow runner is using conscrypt SSL. To disable this feature, "
+  + "remove the pipeline option --experiments={}",
+  experimentName);
   Security.insertProviderAt(new OpenSSLProvider(), 1);
 } else {
   LOG.info(
-  "Experiment disable_conscrypt_security_provider specified, disabling 
conscrypt "
-  + "SSL. Note this is the default Java behavior, but may have 
reduced performance.");
+  "Not using conscrypt SSL. Note this is the default Java behavior, 
but may "
+  + "have reduced performance. To use conscrypt SSL pass pipeline 
option "
+  + "--experiments={}",
+  experimentName);
 }
 return pipelineOptions;
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
index 3ce1e0874c90..3bf1ca8360fb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
@@ -36,7 +36,11 @@
 
   /** Enumeration of all known experiments. */
   public enum Experiment {
-DisableConscryptSecurityProvider("disable_conscrypt_security_provider"),
+/**
+ * Use the Conscrypt OpenSSL Java Security Provider. This may improve the 
performance of SSL
+ * operations for some IO connectors.
+ */
+EnableConscryptSecurityProvider("enable_conscrypt_security_provider"),
 IntertransformIO("intertransform_io"), // Intertransform metrics for 
Shuffle IO (insights)
 SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for 
Side Input IO
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172460)
Time Spent: 1.5h  (was: 1h 20m)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Blocker
> Fix For: 2.9.0
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> 

[jira] [Work logged] (BEAM-5514) BigQueryIO doesn't handle quotaExceeded errors properly

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5514?focusedWorklogId=172461=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172461
 ]

ASF GitHub Bot logged work on BEAM-5514:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:36
Start Date: 05/Dec/18 19:36
Worklog Time Spent: 10m 
  Work Description: ihji commented on issue #7189: [BEAM-5514] BigQueryIO 
doesn't handle quotaExceeded errors properly
URL: https://github.com/apache/beam/pull/7189#issuecomment-444615779
 
 
   splitted into #7189 and #7212 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172461)
Time Spent: 1.5h  (was: 1h 20m)

> BigQueryIO doesn't handle quotaExceeded errors properly
> ---
>
> Key: BEAM-5514
> URL: https://issues.apache.org/jira/browse/BEAM-5514
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Kevin Peterson
>Assignee: Heejong Lee
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When exceeding a streaming quota for BigQuery insertAll requests, BigQuery 
> returns a 403 with reason "quotaExceeded".
> The current implementation of BigQueryIO does not consider this to be a rate 
> limited exception, and therefore does not perform exponential backoff 
> properly, leading to repeated calls to BQ.
> The actual error is in the 
> [ApiErrorExtractor|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L739]
>  class, which is called from 
> [BigQueryServicesImpl|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/util/src/main/java/com/google/cloud/hadoop/util/ApiErrorExtractor.java#L263]
>  to determine how to retry the failure.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-6183) BigQuery insertAll API request rate is not properly controlled

2018-12-05 Thread Heejong Lee (JIRA)
Heejong Lee created BEAM-6183:
-

 Summary: BigQuery insertAll API request rate is not properly 
controlled
 Key: BEAM-6183
 URL: https://issues.apache.org/jira/browse/BEAM-6183
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Affects Versions: 2.8.0
Reporter: Heejong Lee
Assignee: Chamikara Jayalath


BigQuery insertAll API request rate is not properly controlled so it produces 
too many rate limit exceeded error messages in the worker log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172459=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172459
 ]

ASF GitHub Bot logged work on BEAM-6174:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:33
Start Date: 05/Dec/18 19:33
Worklog Time Spent: 10m 
  Work Description: chamikaramj closed pull request #7211: [BEAM-6174] Kryo 
dependency removed.
URL: https://github.com/apache/beam/pull/7211
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/extensions/euphoria/build.gradle 
b/sdks/java/extensions/euphoria/build.gradle
index 75d4b357be9c..e6ad57382ee6 100644
--- a/sdks/java/extensions/euphoria/build.gradle
+++ b/sdks/java/extensions/euphoria/build.gradle
@@ -21,13 +21,8 @@ applyJavaNature()
 
 description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 
DSL"
 
-ext {
-kryoVersion = '4.0.2'
-}
-
 dependencies {
 shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
-shadow "com.esotericsoftware:kryo:${kryoVersion}"
 shadow library.java.guava
 testCompile library.java.mockito_core
 testCompile project(path: ":beam-sdks-java-extensions-kryo")


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172459)
Time Spent: 1h 10m  (was: 1h)

> Remove unnecesarry Kryo dependecny from euphoria
> 
>
> Key: BEAM-6174
> URL: https://issues.apache.org/jira/browse/BEAM-6174
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-euphoria
>Reporter: Vaclav Plajt
>Assignee: Vaclav Plajt
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172458=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172458
 ]

ASF GitHub Bot logged work on BEAM-6159:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:33
Start Date: 05/Dec/18 19:33
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on issue #7015: [BEAM-6159] Migrate 
dataflow portable worker using shared library to process bundle 
URL: https://github.com/apache/beam/pull/7015#issuecomment-444614614
 
 
   The squashed commit: 
https://github.com/apache/beam/pull/7015/commits/924bc07e9866c6dbd4b1cdcfb7d43838453defdf
 addressed most comments above. There are several issues left which will be in 
the following up PRs:
   1. Map output PCollection to following OutputReceivers.
   2. Setup the gradle test task to track the new bundle processing with 
ExecutableStage.
   3. Turn on the experiment path for streaming worker.
   
   Please take another look @lukecwik 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172458)
Time Spent: 40m  (was: 0.5h)

> Dataflow portable runner harness should use ExecutableStage to process bundle
> -
>
> Key: BEAM-6159
> URL: https://issues.apache.org/jira/browse/BEAM-6159
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-5462) get rid of .options deprecation warnings in tests

2018-12-05 Thread Heejong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heejong Lee resolved BEAM-5462.
---
   Resolution: Fixed
Fix Version/s: 2.10.0

> get rid of .options deprecation warnings in tests
> ---
>
> Key: BEAM-5462
> URL: https://issues.apache.org/jira/browse/BEAM-5462
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Heejong Lee
>Priority: Minor
> Fix For: 2.10.0
>
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Messages look like:
> {{/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py:360:
>  DeprecationWarning: options is deprecated since First stable release. 
> References to .options will not be supported}}
> {{pipeline.replace_all(_get_transform_overrides(pipeline.options))}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6159) Dataflow portable runner harness should use ExecutableStage to process bundle

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6159?focusedWorklogId=172457=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172457
 ]

ASF GitHub Bot logged work on BEAM-6159:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:31
Start Date: 05/Dec/18 19:31
Worklog Time Spent: 10m 
  Work Description: boyuanzz commented on a change in pull request #7015: 
[BEAM-6159] Migrate dataflow portable worker using shared library to process 
bundle 
URL: https://github.com/apache/beam/pull/7015#discussion_r239206538
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
 ##
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.fn.control;
+
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.EnumMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
+import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
+import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProcessRemoteBundleOperation extends ReceivingOperation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
+  private final StageBundleFactory stageBundleFactory;
+  private RemoteBundle remoteBundle;
+  private StateRequestHandler stateRequestHandler;
+  private BundleProgressHandler progressHandler;
+
+  public ProcessRemoteBundleOperation(
+  OperationContext context, StageBundleFactory stageBundleFactory, 
OutputReceiver[] receivers) {
+super(receivers, context);
+this.stageBundleFactory = stageBundleFactory;
+EnumMap handlerMap = new 
EnumMap<>(TypeCase.class);
+stateRequestHandler = 
StateRequestHandlers.delegateBasedUponType(handlerMap);
+progressHandler = BundleProgressHandler.ignored();
+  }
+
+  @Override
+  public void start() throws Exception {
+try (Closeable scope = context.enterStart()) {
+  super.start();
+  OutputReceiverFactory receiverFactory =
 
 Review comment:
   Fixed in this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172457)
Time Spent: 0.5h  (was: 20m)

> Dataflow portable runner harness should use ExecutableStage to process bundle
> -
>
> Key: BEAM-6159
> URL: https://issues.apache.org/jira/browse/BEAM-6159
> Project: Beam
>  Issue Type: Task
>  Components: runner-dataflow
>Reporter: Boyuan Zhang
>Assignee: Boyuan Zhang
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6183) BigQuery insertAll API request rate is not properly controlled

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6183?focusedWorklogId=172456=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172456
 ]

ASF GitHub Bot logged work on BEAM-6183:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:29
Start Date: 05/Dec/18 19:29
Worklog Time Spent: 10m 
  Work Description: ihji opened a new pull request #7212: [BEAM-6183] 
BigQuery insertAll API request rate is not properly controlled
URL: https://github.com/apache/beam/pull/7212
 
 
   add a rate limiter that dynamically adjusts insertAll submission rate below 
a given rate limit quota.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172456)
Time Spent: 10m
Remaining Estimate: 0h

> BigQuery insertAll API request rate is not properly controlled
> 

[jira] [Assigned] (BEAM-6183) BigQuery insertAll API request rate is not properly controlled

2018-12-05 Thread Heejong Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Heejong Lee reassigned BEAM-6183:
-

Assignee: Heejong Lee  (was: Chamikara Jayalath)

> BigQuery insertAll API request rate is not properly controlled
> --
>
> Key: BEAM-6183
> URL: https://issues.apache.org/jira/browse/BEAM-6183
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Affects Versions: 2.8.0
>Reporter: Heejong Lee
>Assignee: Heejong Lee
>Priority: Major
>
> BigQuery insertAll API request rate is not properly controlled so it produces 
> too many rate limit exceeded error messages in the worker log.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172451
 ]

ASF GitHub Bot logged work on BEAM-6174:


Author: ASF GitHub Bot
Created on: 05/Dec/18 19:01
Start Date: 05/Dec/18 19:01
Worklog Time Spent: 10m 
  Work Description: chamikaramj opened a new pull request #7211: 
[BEAM-6174] Kryo dependency removed.
URL: https://github.com/apache/beam/pull/7211
 
 
   Cherry-picking https://github.com/apache/beam/pull/7195 to 2.9.0 release 
branch.
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172451)
Time Spent: 1h  (was: 50m)

> Remove unnecesarry Kryo dependecny from euphoria
> 
>
> Key: BEAM-6174
> URL: 

[jira] [Work logged] (BEAM-5959) Add Cloud KMS support to GCS copies

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5959?focusedWorklogId=172448=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172448
 ]

ASF GitHub Bot logged work on BEAM-5959:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:53
Start Date: 05/Dec/18 18:53
Worklog Time Spent: 10m 
  Work Description: udim commented on issue #7050: [BEAM-5959] Reimplement 
GCS copies with rewrites.
URL: https://github.com/apache/beam/pull/7050#issuecomment-444599653
 
 
   R: @chamikaramj 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172448)
Time Spent: 3h  (was: 2h 50m)

> Add Cloud KMS support to GCS copies
> ---
>
> Key: BEAM-5959
> URL: https://issues.apache.org/jira/browse/BEAM-5959
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp, sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Beam SDK currently uses the CopyTo GCS API call, which doesn't support 
> copying objects that Customer Managed Encryption Keys (CMEK).
> CMEKs are managed in Cloud KMS.
> Items (for Java and Python SDKs):
> - Update clients to versions that support KMS keys.
> - Change copyTo API calls to use rewriteTo (Python - directly, Java - 
> possibly convert copyTo API call to use client library)
> - Add unit tests.
> - Add basic tests (DirectRunner and GCS buckets with CMEK).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6144) Add support for the autoscalingAlgorithm flag

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6144?focusedWorklogId=172450=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172450
 ]

ASF GitHub Bot logged work on BEAM-6144:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:58
Start Date: 05/Dec/18 18:58
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #7149: 
[BEAM-6144] Add support for the autoscaling flags to the GO SDK.
URL: https://github.com/apache/beam/pull/7149#discussion_r239194436
 
 

 ##
 File path: sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
 ##
 @@ -140,6 +144,18 @@ func Translate(p *pb.Pipeline, opts *JobOptions, 
workerURL, jarURL, modelURL str
if opts.NumWorkers > 0 {
job.Environment.WorkerPools[0].NumWorkers = opts.NumWorkers
}
+   if opts.Algorithm != "" {
+   settings := {
+   Algorithm: map[string]string{
+   "NONE": 
"AUTOSCALING_ALGORITHM_NONE",
+   "THROUGHPUT_BASED": 
"AUTOSCALING_ALGORITHM_BASIC",
+   }[opts.Algorithm],
+   }
+   if opts.MaxNumWorkers > 0 {
 
 Review comment:
   `MaxNumWorkers` could be set even if `opts.Algorithm` is not set. If 
Algorithm is not set, Dataflow will choose the appropriate autoscaling 
algortihm and max num workers would still be used for that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172450)
Time Spent: 20m  (was: 10m)

> Add support for the autoscalingAlgorithm flag
> -
>
> Key: BEAM-6144
> URL: https://issues.apache.org/jira/browse/BEAM-6144
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Andrew Brampton
>Assignee: Robert Burke
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The go SDK does not support the --autoscalingAlgorithm and --maxNumWorkers 
> flags, like the Java and Python SDKs. Add support for this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172442
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:49
Start Date: 05/Dec/18 18:49
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #7210: [BEAM-6182] 
Cherry-pick: Disable conscrypt by default (#7203)
URL: https://github.com/apache/beam/pull/7210#issuecomment-444598217
 
 
   LGTM. Waiting for tests to pass.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172442)
Time Spent: 1h 10m  (was: 1h)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Blocker
> Fix For: 2.9.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172443
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:49
Start Date: 05/Dec/18 18:49
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on issue #7210: [BEAM-6182] 
Cherry-pick: Disable conscrypt by default (#7203)
URL: https://github.com/apache/beam/pull/7210#issuecomment-444598262
 
 
   Run Portable_Python PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172443)
Time Spent: 1h 20m  (was: 1h 10m)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Blocker
> Fix For: 2.9.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172435
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:36
Start Date: 05/Dec/18 18:36
Worklog Time Spent: 10m 
  Work Description: aaltay opened a new pull request #7210: [BEAM-6182] 
Cherry-pick: Disable conscrypt by default (#7203)
URL: https://github.com/apache/beam/pull/7210
 
 
   * Disable conscrypt by default
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172435)
Time Spent: 1h  (was: 50m)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: 

[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172438=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172438
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:42
Start Date: 05/Dec/18 18:42
Worklog Time Spent: 10m 
  Work Description: Ardagan edited a comment on issue #7202: [BEAM-6181] 
Reporting user counters via MonitoringInfos in Portable Dataflow Runner.
URL: https://github.com/apache/beam/pull/7202#issuecomment-444592947
 
 
   @swegner (committer)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172438)
Time Spent: 1h  (was: 50m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172433=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172433
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:33
Start Date: 05/Dec/18 18:33
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on issue #7202: [BEAM-6181] Reporting 
user counters via MonitoringInfos in Portable Dataflow Runner.
URL: https://github.com/apache/beam/pull/7202#issuecomment-444592947
 
 
   @swegner 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172433)
Time Spent: 50m  (was: 40m)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6181) Utilize MetricInfo for reporting user metrics in Portable Dataflow Java Runner.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6181?focusedWorklogId=172428=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172428
 ]

ASF GitHub Bot logged work on BEAM-6181:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:26
Start Date: 05/Dec/18 18:26
Worklog Time Spent: 10m 
  Work Description: Ardagan commented on issue #7202: [BEAM-6181] Reporting 
user counters via MonitoringInfos in Portable Dataflow Runner.
URL: https://github.com/apache/beam/pull/7202#issuecomment-444590796
 
 
   @ajamato Please review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172428)
Time Spent: 40m  (was: 0.5h)

> Utilize MetricInfo for reporting user metrics in Portable Dataflow Java 
> Runner.
> ---
>
> Key: BEAM-6181
> URL: https://issues.apache.org/jira/browse/BEAM-6181
> Project: Beam
>  Issue Type: Bug
>  Components: java-fn-execution
>Reporter: Mikhail Gryzykhin
>Assignee: Mikhail Gryzykhin
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> New approach to report metrics in FnApi is to utilize MetricInfo structures.
> This approach is implemented in Python SDK and work is ongoing in Java SDK.
> This tasks includes plumbing User metrics reported via MetricInfos through 
> Dataflow Java Runner. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172427=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172427
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 18:25
Start Date: 05/Dec/18 18:25
Worklog Time Spent: 10m 
  Work Description: aaltay closed pull request #7203: [BEAM-6182] Disable 
conscrypt by default
URL: https://github.com/apache/beam/pull/7203
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
index 75662f4a2c97..37ce990fb5f0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
@@ -57,18 +57,19 @@ public static DataflowWorkerHarnessOptions 
initializeGlobalStateAndPipelineOptio
 
 ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions);
 
-String experimentName = 
Experiment.DisableConscryptSecurityProvider.getName();
-if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) {
+String experimentName = 
Experiment.EnableConscryptSecurityProvider.getName();
+if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) {
   /* Enable fast SSL provider. */
   LOG.info(
-  "Dataflow runner uses conscrypt by default for SSL. To disable this 
feature, "
-  + "pass pipeline option --experiments={}",
+  "Dataflow runner is using conscrypt SSL. To disable this feature, "
+  + "remove the pipeline option --experiments={}",
   experimentName);
   Security.insertProviderAt(new OpenSSLProvider(), 1);
 } else {
   LOG.info(
-  "Experiment {} specified, disabling conscrypt SSL. Note this is the 
default "
-  + "Java behavior, but may have reduced performance.",
+  "Not using conscrypt SSL. Note this is the default Java behavior, 
but may "
+  + "have reduced performance. To use conscrypt SSL pass pipeline 
option "
+  + "--experiments={}",
   experimentName);
 }
 return pipelineOptions;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
index 3ce1e0874c90..3bf1ca8360fb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
@@ -36,7 +36,11 @@
 
   /** Enumeration of all known experiments. */
   public enum Experiment {
-DisableConscryptSecurityProvider("disable_conscrypt_security_provider"),
+/**
+ * Use the Conscrypt OpenSSL Java Security Provider. This may improve the 
performance of SSL
+ * operations for some IO connectors.
+ */
+EnableConscryptSecurityProvider("enable_conscrypt_security_provider"),
 IntertransformIO("intertransform_io"), // Intertransform metrics for 
Shuffle IO (insights)
 SideInputIOMetrics("sideinput_io_metrics"); // Intertransform metrics for 
Side Input IO
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172427)
Time Spent: 50m  (was: 40m)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Blocker
> Fix For: 2.9.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it 

[jira] [Updated] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread Ahmet Altay (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay updated BEAM-6182:
--
Priority: Blocker  (was: Major)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Blocker
> Fix For: 2.9.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread Ahmet Altay (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmet Altay updated BEAM-6182:
--
Fix Version/s: 2.9.0

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Major
> Fix For: 2.9.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172415=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172415
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 17:45
Start Date: 05/Dec/18 17:45
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #7203: 
[BEAM-6182] Disable conscrypt by default
URL: https://github.com/apache/beam/pull/7203#discussion_r239168968
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
 ##
 @@ -57,18 +57,19 @@ public static DataflowWorkerHarnessOptions 
initializeGlobalStateAndPipelineOptio
 
 ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions);
 
-String experimentName = 
Experiment.DisableConscryptSecurityProvider.getName();
-if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) {
+String experimentName = 
Experiment.EnableConscryptSecurityProvider.getName();
+if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) {
   /* Enable fast SSL provider. */
   LOG.info(
-  "Dataflow runner uses conscrypt by default for SSL. To disable this 
feature, "
-  + "pass pipeline option --experiments={}",
+  "Dataflow runner is using conscrypt SSL. To disable this feature, "
+  + "remove the pipeline option --experiments={}",
   experimentName);
   Security.insertProviderAt(new OpenSSLProvider(), 1);
 } else {
   LOG.info(
 
 Review comment:
   I would like to keep it as an info since this is a change in behavior for 
Dataflow runner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172415)
Time Spent: 20m  (was: 10m)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172417=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172417
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 17:48
Start Date: 05/Dec/18 17:48
Worklog Time Spent: 10m 
  Work Description: aaltay commented on issue #7203: [BEAM-6182] Disable 
conscrypt by default
URL: https://github.com/apache/beam/pull/7203#issuecomment-444578137
 
 
   Thank you @swegner. Addressed your questions. Will merge this after tests 
pass.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172417)
Time Spent: 40m  (was: 0.5h)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172416=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172416
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 17:48
Start Date: 05/Dec/18 17:48
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #7203: 
[BEAM-6182] Disable conscrypt by default
URL: https://github.com/apache/beam/pull/7203#discussion_r239169766
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java
 ##
 @@ -57,18 +57,19 @@ public static DataflowWorkerHarnessOptions 
initializeGlobalStateAndPipelineOptio
 
 ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions);
 
-String experimentName = 
Experiment.DisableConscryptSecurityProvider.getName();
-if (!ec.isEnabled(Experiment.DisableConscryptSecurityProvider)) {
+String experimentName = 
Experiment.EnableConscryptSecurityProvider.getName();
+if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) {
 
 Review comment:
   1. We have a sense that IO bound pipelines could be affected up-to a 15%. 
(The number mentioned from the previous PR.) Likely typical regression will be 
much less.
   2. This does not affect Beam in general. It affects Dataflow only. I think 
highlighting this in the release notes is a good idea. (Dataflow if they choose 
to do so could take additional proactive measures to communicate the change.)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172416)
Time Spent: 0.5h  (was: 20m)

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6182?focusedWorklogId=172414=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172414
 ]

ASF GitHub Bot logged work on BEAM-6182:


Author: ASF GitHub Bot
Created on: 05/Dec/18 17:45
Start Date: 05/Dec/18 17:45
Worklog Time Spent: 10m 
  Work Description: aaltay commented on a change in pull request #7203: 
[BEAM-6182] Disable conscrypt by default
URL: https://github.com/apache/beam/pull/7203#discussion_r239168760
 
 

 ##
 File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ExperimentContext.java
 ##
 @@ -36,7 +36,7 @@
 
   /** Enumeration of all known experiments. */
   public enum Experiment {
-DisableConscryptSecurityProvider("disable_conscrypt_security_provider"),
+EnableConscryptSecurityProvider("enable_conscrypt_security_provider"),
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172414)
Time Spent: 10m
Remaining Estimate: 0h

> Use of conscrypt SSL results in stuck workflows in Dataflow
> ---
>
> Key: BEAM-6182
> URL: https://issues.apache.org/jira/browse/BEAM-6182
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Reporter: Ahmet Altay
>Assignee: Tyler Akidau
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> An experimental flag is being added to disable it for now with an option to 
> enable it per-workflow.
> Also related:
> https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
> latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-6182) Use of conscrypt SSL results in stuck workflows in Dataflow

2018-12-05 Thread Ahmet Altay (JIRA)
Ahmet Altay created BEAM-6182:
-

 Summary: Use of conscrypt SSL results in stuck workflows in 
Dataflow
 Key: BEAM-6182
 URL: https://issues.apache.org/jira/browse/BEAM-6182
 Project: Beam
  Issue Type: Bug
  Components: runner-dataflow
Reporter: Ahmet Altay
Assignee: Tyler Akidau


An experimental flag is being added to disable it for now with an option to 
enable it per-workflow.

Also related:
https://issues.apache.org/jira/browse/BEAM-5747 - Upgrade conscrypt to its 
latest version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-6151) MongoDbIO add support mongodb server with self signed ssl

2018-12-05 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-6151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré updated BEAM-6151:
---
Fix Version/s: (was: 2.9.0)
   2.10.0

> MongoDbIO add support mongodb server with self signed ssl
> -
>
> Key: BEAM-6151
> URL: https://issues.apache.org/jira/browse/BEAM-6151
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-mongodb
>Affects Versions: 2.8.0
>Reporter: Chaim
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> If the mongodb server does not have a certified ssl certificate you cannot 
> connect to the server



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6151) MongoDbIO add support mongodb server with self signed ssl

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6151?focusedWorklogId=172400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172400
 ]

ASF GitHub Bot logged work on BEAM-6151:


Author: ASF GitHub Bot
Created on: 05/Dec/18 17:27
Start Date: 05/Dec/18 17:27
Worklog Time Spent: 10m 
  Work Description: jbonofre closed pull request #7162: BEAM-6151: 
MongoDbIO add support mongodb server with self signed ssl
URL: https://github.com/apache/beam/pull/7162
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index 9d6e05bfb209..309a30d582b7 100644
--- 
a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ 
b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -104,6 +104,9 @@ public static Read read() {
 .setKeepAlive(true)
 .setMaxConnectionIdleTime(6)
 .setNumSplits(0)
+.setSslEnabled(false)
+.setIgnoreSSLCertificate(false)
+.setSslInvalidHostNameAllowed(false)
 .build();
   }
 
@@ -113,6 +116,9 @@ public static Write write() {
 .setKeepAlive(true)
 .setMaxConnectionIdleTime(6)
 .setBatchSize(1024L)
+.setSslEnabled(false)
+.setIgnoreSSLCertificate(false)
+.setSslInvalidHostNameAllowed(false)
 .build();
   }
 
@@ -128,6 +134,12 @@ private MongoDbIO() {}
 
 abstract int maxConnectionIdleTime();
 
+abstract boolean sslEnabled();
+
+abstract boolean sslInvalidHostNameAllowed();
+
+abstract boolean ignoreSSLCertificate();
+
 @Nullable
 abstract String database();
 
@@ -152,6 +164,12 @@ private MongoDbIO() {}
 
   abstract Builder setMaxConnectionIdleTime(int maxConnectionIdleTime);
 
+  abstract Builder setSslEnabled(boolean value);
+
+  abstract Builder setSslInvalidHostNameAllowed(boolean value);
+
+  abstract Builder setIgnoreSSLCertificate(boolean value);
+
   abstract Builder setDatabase(String database);
 
   abstract Builder setCollection(String collection);
@@ -215,6 +233,21 @@ public Read withMaxConnectionIdleTime(int 
maxConnectionIdleTime) {
   return builder().setMaxConnectionIdleTime(maxConnectionIdleTime).build();
 }
 
+/** Enable ssl for connection. */
+public Read withSSLEnabled(boolean sslEnabled) {
+  return builder().setSslEnabled(sslEnabled).build();
+}
+
+/** Enable invalidHostNameAllowed for ssl for connection. */
+public Read withSSLInvalidHostNameAllowed(boolean invalidHostNameAllowed) {
+  return 
builder().setSslInvalidHostNameAllowed(invalidHostNameAllowed).build();
+}
+
+/** Enable ignoreSSLCertificate for ssl for connection (allow for self 
signed ceritificates). */
+public Read withIgnoreSSLCertificate(boolean ignoreSSLCertificate) {
+  return builder().setIgnoreSSLCertificate(ignoreSSLCertificate).build();
+}
+
 /** Sets the database to use. */
 public Read withDatabase(String database) {
   checkArgument(database != null, "database can not be null");
@@ -259,6 +292,10 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
   builder.add(DisplayData.item("uri", uri()));
   builder.add(DisplayData.item("keepAlive", keepAlive()));
   builder.add(DisplayData.item("maxConnectionIdleTime", 
maxConnectionIdleTime()));
+  builder.add(DisplayData.item("sslEnabled", sslEnabled()));
+  builder.add(DisplayData.item("sslInvalidHostNameAllowed", 
sslInvalidHostNameAllowed()));
+  builder.add(DisplayData.item("ignoreSSLCertificate", 
ignoreSSLCertificate()));
+
   builder.add(DisplayData.item("database", database()));
   builder.add(DisplayData.item("collection", collection()));
   builder.addIfNotNull(DisplayData.item("filter", filter()));
@@ -270,6 +307,22 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 }
   }
 
+  private static MongoClientOptions.Builder getOptions(
+  boolean keepAlive,
+  int maxConnectionIdleTime,
+  boolean sslEnabled,
+  boolean sslInvalidHostNameAllowed) {
+MongoClientOptions.Builder optionsBuilder = new 
MongoClientOptions.Builder();
+
optionsBuilder.socketKeepAlive(keepAlive).maxConnectionIdleTime(maxConnectionIdleTime);
+if (sslEnabled) {
+  optionsBuilder
+  .sslEnabled(sslEnabled)
+  .sslInvalidHostNameAllowed(sslInvalidHostNameAllowed)
+  .sslContext(SSLUtils.ignoreSSLCertificate());
+}
+return optionsBuilder;
+  }
+
   /** 

[jira] [Resolved] (BEAM-6151) MongoDbIO add support mongodb server with self signed ssl

2018-12-05 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/BEAM-6151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jean-Baptiste Onofré resolved BEAM-6151.

Resolution: Fixed

> MongoDbIO add support mongodb server with self signed ssl
> -
>
> Key: BEAM-6151
> URL: https://issues.apache.org/jira/browse/BEAM-6151
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-mongodb
>Affects Versions: 2.8.0
>Reporter: Chaim
>Assignee: Jean-Baptiste Onofré
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> If the mongodb server does not have a certified ssl certificate you cannot 
> connect to the server



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172398
 ]

ASF GitHub Bot logged work on BEAM-6167:


Author: ASF GitHub Bot
Created on: 05/Dec/18 17:16
Start Date: 05/Dec/18 17:16
Worklog Time Spent: 10m 
  Work Description: lcaggio commented on a change in pull request #7193: 
[BEAM-6167] Add class ReadFromTextWithFilename (Python)
URL: https://github.com/apache/beam/pull/7193#discussion_r239158713
 
 

 ##
 File path: sdks/python/apache_beam/io/textio.py
 ##
 @@ -527,6 +533,61 @@ def expand(self, pvalue):
 return pvalue.pipeline | Read(self._source)
 
 
+class ReadFromTextWithFilename(PTransform):
 
 Review comment:
   I like the _ReadFromTextBase approach, I'm working to update the code 
accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172398)
Time Spent: 50m  (was: 40m)

> Create a Class to read content of a file keeping track of the file path 
> (python)
> 
>
> Key: BEAM-6167
> URL: https://issues.apache.org/jira/browse/BEAM-6167
> Project: Beam
>  Issue Type: Improvement
>  Components: io-ideas
>Affects Versions: 2.8.0
>Reporter: Lorenzo Caggioni
>Assignee: Eugene Kirpichov
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Add a class to read content of a file keeping track of the file path each 
> element come from.
> This is an improvement of the current python/apache_beam/io/textio.py



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5817) Nexmark test of joining stream to files

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5817?focusedWorklogId=172388=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172388
 ]

ASF GitHub Bot logged work on BEAM-5817:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:59
Start Date: 05/Dec/18 16:59
Worklog Time Spent: 10m 
  Work Description: kennknowles closed pull request #7205: [BEAM-5817] Add 
SQL bounded side input join to queries that are actually run
URL: https://github.com/apache/beam/pull/7205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
index c797064b4b5a..8bafecc03ea0 100644
--- 
a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
+++ 
b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java
@@ -78,6 +78,7 @@
 import org.apache.beam.sdk.nexmark.queries.Query8Model;
 import org.apache.beam.sdk.nexmark.queries.Query9;
 import org.apache.beam.sdk.nexmark.queries.Query9Model;
+import org.apache.beam.sdk.nexmark.queries.sql.SqlBoundedSideInputJoin;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery1;
 import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery2;
@@ -1240,6 +1241,9 @@ private NexmarkQueryModel getNexmarkQueryModel() {
 .put(
 NexmarkQueryName.HIGHEST_BID,
 new NexmarkQuery(configuration, new SqlQuery7(configuration)))
+.put(
+NexmarkQueryName.BOUNDED_SIDE_INPUT_JOIN,
+new NexmarkQuery(configuration, new 
SqlBoundedSideInputJoin(configuration)))
 .build();
   }
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172388)
Time Spent: 8h 10m  (was: 8h)

> Nexmark test of joining stream to files
> ---
>
> Key: BEAM-5817
> URL: https://issues.apache.org/jira/browse/BEAM-5817
> Project: Beam
>  Issue Type: New Feature
>  Components: examples-nexmark
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> Nexmark is a convenient framework for testing the use case of large scale 
> stream enrichment. One way is joining a stream to files, and it can be tested 
> via any source that Nexmark supports.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria

2018-12-05 Thread David Moravek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Moravek resolved BEAM-6174.
-
Resolution: Fixed

> Remove unnecesarry Kryo dependecny from euphoria
> 
>
> Key: BEAM-6174
> URL: https://issues.apache.org/jira/browse/BEAM-6174
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-euphoria
>Reporter: Vaclav Plajt
>Assignee: Vaclav Plajt
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria

2018-12-05 Thread David Moravek (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Moravek updated BEAM-6174:

Fix Version/s: 2.10.0

> Remove unnecesarry Kryo dependecny from euphoria
> 
>
> Key: BEAM-6174
> URL: https://issues.apache.org/jira/browse/BEAM-6174
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-euphoria
>Reporter: Vaclav Plajt
>Assignee: Vaclav Plajt
>Priority: Major
> Fix For: 2.10.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172369=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172369
 ]

ASF GitHub Bot logged work on BEAM-6174:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:37
Start Date: 05/Dec/18 16:37
Worklog Time Spent: 10m 
  Work Description: dmvk commented on issue #7195: [BEAM-6174] Kryo 
dependency removed.
URL: https://github.com/apache/beam/pull/7195#issuecomment-444552530
 
 
   Target release changed to 2.10


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172369)
Time Spent: 50m  (was: 40m)

> Remove unnecesarry Kryo dependecny from euphoria
> 
>
> Key: BEAM-6174
> URL: https://issues.apache.org/jira/browse/BEAM-6174
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-euphoria
>Reporter: Vaclav Plajt
>Assignee: Vaclav Plajt
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6174) Remove unnecesarry Kryo dependecny from euphoria

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6174?focusedWorklogId=172368=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172368
 ]

ASF GitHub Bot logged work on BEAM-6174:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:37
Start Date: 05/Dec/18 16:37
Worklog Time Spent: 10m 
  Work Description: dmvk closed pull request #7195: [BEAM-6174] Kryo 
dependency removed.
URL: https://github.com/apache/beam/pull/7195
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/extensions/euphoria/build.gradle 
b/sdks/java/extensions/euphoria/build.gradle
index 75d4b357be9c..e6ad57382ee6 100644
--- a/sdks/java/extensions/euphoria/build.gradle
+++ b/sdks/java/extensions/euphoria/build.gradle
@@ -21,13 +21,8 @@ applyJavaNature()
 
 description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 
DSL"
 
-ext {
-kryoVersion = '4.0.2'
-}
-
 dependencies {
 shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
-shadow "com.esotericsoftware:kryo:${kryoVersion}"
 shadow library.java.guava
 testCompile library.java.mockito_core
 testCompile project(path: ":beam-sdks-java-extensions-kryo")


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172368)
Time Spent: 40m  (was: 0.5h)

> Remove unnecesarry Kryo dependecny from euphoria
> 
>
> Key: BEAM-6174
> URL: https://issues.apache.org/jira/browse/BEAM-6174
> Project: Beam
>  Issue Type: Sub-task
>  Components: dsl-euphoria
>Reporter: Vaclav Plajt
>Assignee: Vaclav Plajt
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172359=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172359
 ]

ASF GitHub Bot logged work on BEAM-6165:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:20
Start Date: 05/Dec/18 16:20
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#7183: [BEAM-6165] send metrics to Flink in portable Flink runner
URL: https://github.com/apache/beam/pull/7183#discussion_r239135253
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
 ##
 @@ -59,25 +65,63 @@
   environment_type = known_args.environment_type.lower()
   environment_config = (
   known_args.environment_config if known_args.environment_config else None)
+  test = known_args.test
 
   # This is defined here to only be run when we invoke this file explicitly.
   class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
 _use_grpc = True
 _use_subprocesses = True
 
+conf_dir = None
+
+@classmethod
+def tearDownClass(cls):
+  if cls.conf_dir and exists(cls.conf_dir):
+logging.info("removing conf dir: %s" % cls.conf_dir)
+rmtree(cls.conf_dir)
+  super(FlinkRunnerTest, cls).tearDownClass()
+
+@classmethod
+def _create_conf_dir(cls):
+  """Create (and save a static reference to) a "conf dir", used to provide 
metrics configs and
+   verify metrics output
+
+   It gets cleaned up when the suite is done executing"""
+
+  if hasattr(cls, 'conf_dir'):
+cls.conf_dir = mkdtemp(prefix='flinktest-conf')
+
+# path for a FileReporter to write metrics to
+cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
+
+# path to write Flink configuration to
+conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
+with open(conf_path, 'w') as f:
+  f.write(linesep.join([
+'metrics.reporters: test',
+'metrics.reporter.test.class: 
org.apache.beam.runners.flink.metrics.FileReporter',
+'metrics.reporter.test.file: %s' % cls.test_metrics_path
+  ]))
+
 @classmethod
 def _subprocess_command(cls, port):
-  tmp_dir = tempfile.mkdtemp(prefix='flinktest')
+  # will be cleaned up at the end of this method, and recreated and used 
by the job server
+  tmp_dir = mkdtemp(prefix='flinktest')
+
+  cls._create_conf_dir()
+
   try:
 return [
 'java',
 '-jar', flink_job_server_jar,
+'--flink-master-url', '[local]',
 
 Review comment:
   thanks, I am using the `--flink-conf-dir` added in #7031; I don't see a way 
to avoid specifying `[local]` here though. it also sounds like we are saying 
that this test is intended to only run in local mode, so we may as well make 
that explicit?
   
   The previous (`[auto]`) code path goes through Flink code that doesn't seem 
to support injecting an existing `Configuration` (cf. 
[streaming](https://github.com/apache/flink/blob/release-1.5.5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1595-L1612),
 
[batch](https://github.com/apache/flink/blob/release-1.5.5/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L1058-L1061);
 presumably this makes sense because it would be strange to do that if actually 
in remote/cluster mode?)
   
   lmk if that doesn't make sense or there is something I should change here, 
thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172359)
Time Spent: 5h 50m  (was: 5h 40m)

> Send metrics to Flink in portable Flink runner
> --
>
> Key: BEAM-6165
> URL: https://issues.apache.org/jira/browse/BEAM-6165
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Ryan Williams
>Assignee: Ryan Williams
>Priority: Major
>  Labels: metrics, portability, portability-flink
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runner in the Python SDK (and likely 
> Java soon), but the portable Flink runner doesn't pass them on to Flink, 
> which it should, so that users can see them in e.g. the Flink UI or via any 
> Flink metrics reporters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172349=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172349
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239129246
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
 ##
 @@ -78,6 +81,40 @@ public void processElement(
 String executableStageName =
 ExecutableStageTranslation.generateNameFromStagePayload(basePayload);
 
-assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, 
count}"));
+assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, 
Composite}"));
+  }
+
+  @Test
+  public void testOperatorNameGenerationFromNames() {
+assertGeneratedNames("A", "A", Arrays.asList("A"));
+assertGeneratedNames("A/a1", "A/a1", Arrays.asList("A/a1"));
+assertGeneratedNames("A/{a1, a2}", "A/{a1, a2}", Arrays.asList("A/a1", 
"A/a2"));
+assertGeneratedNames(
+"A/{a1, a2}", "A/{a1, a2/{a2.1, a2.2}}", Arrays.asList("A/a1", 
"A/a2/a2.1", "A/a2/a2.2"));
+assertGeneratedNames("{A, B}", "{A/{a1, a2}, B}", Arrays.asList("A/a1", 
"A/a2", "B"));
+assertGeneratedNames(
+"{A, B, C}", "{A/{a1, a2}, B, C/c/cc}", Arrays.asList("A/a1", "A/a2", 
"B", "C/c/cc"));
 
 Review comment:
   The logic is that we only recurse into the composites if we have only one of 
it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172349)
Time Spent: 2.5h  (was: 2h 20m)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172354=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172354
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239122914
 
 

 ##
 File path: 
runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ExecutableStageTranslationTest.java
 ##
 @@ -78,6 +81,40 @@ public void processElement(
 String executableStageName =
 ExecutableStageTranslation.generateNameFromStagePayload(basePayload);
 
-assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, 
count}"));
+assertThat(executableStageName, is("[3]{ParDo(Anonymous), MyName, 
Composite}"));
+  }
+
+  @Test
+  public void testOperatorNameGenerationFromNames() {
+assertGeneratedNames("A", "A", Arrays.asList("A"));
+assertGeneratedNames("A/a1", "A/a1", Arrays.asList("A/a1"));
+assertGeneratedNames("A/{a1, a2}", "A/{a1, a2}", Arrays.asList("A/a1", 
"A/a2"));
+assertGeneratedNames(
+"A/{a1, a2}", "A/{a1, a2/{a2.1, a2.2}}", Arrays.asList("A/a1", 
"A/a2/a2.1", "A/a2/a2.2"));
+assertGeneratedNames("{A, B}", "{A/{a1, a2}, B}", Arrays.asList("A/a1", 
"A/a2", "B"));
+assertGeneratedNames(
+"{A, B, C}", "{A/{a1, a2}, B, C/c/cc}", Arrays.asList("A/a1", "A/a2", 
"B", "C/c/cc"));
 
 Review comment:
   It seems inconsistent that we don't get `A{a1/a2}, B, C` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172354)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172353=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172353
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239130429
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 ##
 @@ -45,19 +53,60 @@ public static String 
generateNameFromStagePayload(ExecutableStagePayload stagePa
 RunnerApi.Components components = stagePayload.getComponents();
 final int transformsCount = stagePayload.getTransformsCount();
 sb.append("[").append(transformsCount).append("]");
-sb.append("{");
+Collection names = new ArrayList<>();
 for (int i = 0; i < transformsCount; i++) {
   String name = 
components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName();
-  // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes 
the name longer
-  name = name.replaceFirst("^ref_AppliedPTransform_", "");
   // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the 
name longer
   name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", "");
-  sb.append(name);
-  if (i + 1 < transformsCount) {
-sb.append(", ");
-  }
+  names.add(name);
 }
-sb.append("}");
+sb.append(generateNameFromTransformNames(names, true));
 return sb.toString();
   }
+
+  public static String generateNameFromTransformNames(Collection 
names, boolean truncate) {
+Multimap groupByOuter = LinkedHashMultimap.create();
+for (String name : names) {
+  int index = name.indexOf('/');
+  if (index == -1) {
+groupByOuter.put(name, "");
+  } else {
+groupByOuter.put(name.substring(0, index), name.substring(index + 1));
+  }
+}
+if (groupByOuter.keySet().size() == 1) {
+  Map.Entry> outer =
+  Iterables.getOnlyElement(groupByOuter.asMap().entrySet());
+  if (outer.getValue().size() == 1 && outer.getValue().contains("")) {
+// Names consisted of a single name without any slashes.
+return outer.getKey();
+  } else {
+// Everything is in the same outer stage, enumerate at one level down.
+return String.format(
+"%s/%s", outer.getKey(), 
generateNameFromTransformNames(outer.getValue(), truncate));
+  }
+} else {
+  Collection parts;
+  if (truncate) {
+// Enumerate the outer stages without their composite structure, if 
any.
+parts = groupByOuter.keySet();
 
 Review comment:
   Note to self: Recursion anchor.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172353)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172350=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172350
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239105166
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 ##
 @@ -45,19 +53,60 @@ public static String 
generateNameFromStagePayload(ExecutableStagePayload stagePa
 RunnerApi.Components components = stagePayload.getComponents();
 final int transformsCount = stagePayload.getTransformsCount();
 sb.append("[").append(transformsCount).append("]");
-sb.append("{");
+Collection names = new ArrayList<>();
 for (int i = 0; i < transformsCount; i++) {
   String name = 
components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName();
-  // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes 
the name longer
-  name = name.replaceFirst("^ref_AppliedPTransform_", "");
   // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the 
name longer
   name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", "");
-  sb.append(name);
-  if (i + 1 < transformsCount) {
-sb.append(", ");
-  }
+  names.add(name);
 }
-sb.append("}");
+sb.append(generateNameFromTransformNames(names, true));
 return sb.toString();
   }
+
+  public static String generateNameFromTransformNames(Collection 
names, boolean truncate) {
 
 Review comment:
   Would it make sense to add a short JavaDoc comment, explaining what kind of 
modes this method offers for generating a name from the transforms?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172350)
Time Spent: 2h 40m  (was: 2.5h)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172351=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172351
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239130260
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 ##
 @@ -45,19 +53,60 @@ public static String 
generateNameFromStagePayload(ExecutableStagePayload stagePa
 RunnerApi.Components components = stagePayload.getComponents();
 final int transformsCount = stagePayload.getTransformsCount();
 sb.append("[").append(transformsCount).append("]");
-sb.append("{");
+Collection names = new ArrayList<>();
 for (int i = 0; i < transformsCount; i++) {
   String name = 
components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName();
-  // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes 
the name longer
-  name = name.replaceFirst("^ref_AppliedPTransform_", "");
   // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the 
name longer
   name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", "");
-  sb.append(name);
-  if (i + 1 < transformsCount) {
-sb.append(", ");
-  }
+  names.add(name);
 }
-sb.append("}");
+sb.append(generateNameFromTransformNames(names, true));
 return sb.toString();
   }
+
+  public static String generateNameFromTransformNames(Collection 
names, boolean truncate) {
+Multimap groupByOuter = LinkedHashMultimap.create();
+for (String name : names) {
+  int index = name.indexOf('/');
+  if (index == -1) {
+groupByOuter.put(name, "");
+  } else {
+groupByOuter.put(name.substring(0, index), name.substring(index + 1));
+  }
+}
+if (groupByOuter.keySet().size() == 1) {
+  Map.Entry> outer =
+  Iterables.getOnlyElement(groupByOuter.asMap().entrySet());
+  if (outer.getValue().size() == 1 && outer.getValue().contains("")) {
+// Names consisted of a single name without any slashes.
+return outer.getKey();
 
 Review comment:
   Note to self: Recursion anchor.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172351)
Time Spent: 2h 40m  (was: 2.5h)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172348=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172348
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239114817
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 ##
 @@ -45,19 +53,60 @@ public static String 
generateNameFromStagePayload(ExecutableStagePayload stagePa
 RunnerApi.Components components = stagePayload.getComponents();
 final int transformsCount = stagePayload.getTransformsCount();
 sb.append("[").append(transformsCount).append("]");
-sb.append("{");
+Collection names = new ArrayList<>();
 for (int i = 0; i < transformsCount; i++) {
   String name = 
components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName();
-  // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes 
the name longer
-  name = name.replaceFirst("^ref_AppliedPTransform_", "");
   // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the 
name longer
   name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", "");
-  sb.append(name);
-  if (i + 1 < transformsCount) {
-sb.append(", ");
-  }
+  names.add(name);
 }
-sb.append("}");
+sb.append(generateNameFromTransformNames(names, true));
 return sb.toString();
   }
+
+  public static String generateNameFromTransformNames(Collection 
names, boolean truncate) {
 
 Review comment:
   Would `collapseComposites` be a better name than `truncate`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172348)
Time Spent: 2.5h  (was: 2h 20m)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172352=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172352
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239104348
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 ##
 @@ -45,19 +53,60 @@ public static String 
generateNameFromStagePayload(ExecutableStagePayload stagePa
 RunnerApi.Components components = stagePayload.getComponents();
 final int transformsCount = stagePayload.getTransformsCount();
 sb.append("[").append(transformsCount).append("]");
-sb.append("{");
+Collection names = new ArrayList<>();
 for (int i = 0; i < transformsCount; i++) {
   String name = 
components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName();
-  // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes 
the name longer
-  name = name.replaceFirst("^ref_AppliedPTransform_", "");
   // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the 
name longer
   name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", "");
-  sb.append(name);
-  if (i + 1 < transformsCount) {
-sb.append(", ");
-  }
+  names.add(name);
 }
-sb.append("}");
+sb.append(generateNameFromTransformNames(names, true));
 return sb.toString();
   }
+
+  public static String generateNameFromTransformNames(Collection 
names, boolean truncate) {
+Multimap groupByOuter = LinkedHashMultimap.create();
+for (String name : names) {
+  int index = name.indexOf('/');
 
 Review comment:
   I suppose names can never contain slashes or they are escaped?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172352)
Time Spent: 2h 50m  (was: 2h 40m)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172355=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172355
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 16:10
Start Date: 05/Dec/18 16:10
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7208: 
[BEAM-5859] Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208#discussion_r239128952
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 ##
 @@ -45,19 +53,60 @@ public static String 
generateNameFromStagePayload(ExecutableStagePayload stagePa
 RunnerApi.Components components = stagePayload.getComponents();
 final int transformsCount = stagePayload.getTransformsCount();
 sb.append("[").append(transformsCount).append("]");
-sb.append("{");
+Collection names = new ArrayList<>();
 for (int i = 0; i < transformsCount; i++) {
   String name = 
components.getTransformsOrThrow(stagePayload.getTransforms(i)).getUniqueName();
-  // Python: Remove the 'ref_AppliedPTransform_' prefix which just makes 
the name longer
-  name = name.replaceFirst("^ref_AppliedPTransform_", "");
   // Java: Remove the 'ParMultiDo(Anonymous)' suffix which just makes the 
name longer
   name = name.replaceFirst("/ParMultiDo\\(Anonymous\\)$", "");
-  sb.append(name);
-  if (i + 1 < transformsCount) {
-sb.append(", ");
-  }
+  names.add(name);
 }
-sb.append("}");
+sb.append(generateNameFromTransformNames(names, true));
 return sb.toString();
   }
+
+  public static String generateNameFromTransformNames(Collection 
names, boolean truncate) {
+Multimap groupByOuter = LinkedHashMultimap.create();
+for (String name : names) {
+  int index = name.indexOf('/');
+  if (index == -1) {
+groupByOuter.put(name, "");
+  } else {
+groupByOuter.put(name.substring(0, index), name.substring(index + 1));
+  }
+}
+if (groupByOuter.keySet().size() == 1) {
+  Map.Entry> outer =
+  Iterables.getOnlyElement(groupByOuter.asMap().entrySet());
+  if (outer.getValue().size() == 1 && outer.getValue().contains("")) {
+// Names consisted of a single name without any slashes.
+return outer.getKey();
+  } else {
+// Everything is in the same outer stage, enumerate at one level down.
+return String.format(
+"%s/%s", outer.getKey(), 
generateNameFromTransformNames(outer.getValue(), truncate));
 
 Review comment:
   We go down a level here but only if we had multiple elements inside the same 
composite transform and no other composite transforms.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172355)
Time Spent: 3h  (was: 2h 50m)

> Improve Traceability of Pipeline translation
> 
>
> Key: BEAM-5859
> URL: https://issues.apache.org/jira/browse/BEAM-5859
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Reporter: Maximilian Michels
>Priority: Major
>  Labels: portability, portability-flink
> Attachments: tfx.png, wordcount.png
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> Users often ask how they can reason about the pipeline translation. The Flink 
> UI display a confusingly large graph without any trace of the original Beam 
> pipeline:
> WordCount:
>  !wordcount.png! 
> TFX:
>  !tfx.png! 
> Some aspects which make understanding these graphs hard:
>  * Users don't know how the Runner maps Beam to Flink concepts
>  * The UI is awfully slow / hangs when the pipeline is reasonable complex
>  * The operator names seem to use {{transform.getUniqueName()}} which doesn't 
> generate readable name
>  * So called Chaining combines operators into a single operator which makes 
> understanding which Beam concept belongs to which Flink concept even harder
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172343=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172343
 ]

ASF GitHub Bot logged work on BEAM-6165:


Author: ASF GitHub Bot
Created on: 05/Dec/18 15:59
Start Date: 05/Dec/18 15:59
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#7183: [BEAM-6165] send metrics to Flink in portable Flink runner
URL: https://github.com/apache/beam/pull/7183#discussion_r239123843
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+/**
+ * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics 
reporter} for writing
+ * metrics to a file (specified via the "metrics.reporter.test.file" config 
key).
+ */
+public class FileReporter extends AbstractReporter {
 
 Review comment:
   (woops, I'd not seen Max's comment above when I posted this, but seems like 
we're saying the same thing)
   
   fwiw: "beam", "flink", "metrics", "file", and "reporter" are already in this 
class' name: `org.apache.beam.runners.flink.metrics.FileReporter` (and users 
would use this FQN in `flink-conf.yaml`, as in the example above).
   
   I generally dislike the Java convention of repeating package-name segments 
in class' basenames, but recognize that it's common and people seem to like it, 
so happy to add here, just wanted to float the idea that 
`org.apache.beam.runners.flink.metrics.BeamFlinkFileMetricReporter` is 
redundant.
   
   lmk if you still prefer it and I'll change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172343)
Time Spent: 5h 40m  (was: 5.5h)

> Send metrics to Flink in portable Flink runner
> --
>
> Key: BEAM-6165
> URL: https://issues.apache.org/jira/browse/BEAM-6165
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Ryan Williams
>Assignee: Ryan Williams
>Priority: Major
>  Labels: metrics, portability, portability-flink
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runner in the Python SDK (and likely 
> Java soon), but the portable Flink runner doesn't pass them on to Flink, 
> which it should, so that users can see them in e.g. the Flink UI or via any 
> Flink metrics reporters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172340=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172340
 ]

ASF GitHub Bot logged work on BEAM-6165:


Author: ASF GitHub Bot
Created on: 05/Dec/18 15:54
Start Date: 05/Dec/18 15:54
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#7183: [BEAM-6165] send metrics to Flink in portable Flink runner
URL: https://github.com/apache/beam/pull/7183#discussion_r239123843
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+/**
+ * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics 
reporter} for writing
+ * metrics to a file (specified via the "metrics.reporter.test.file" config 
key).
+ */
+public class FileReporter extends AbstractReporter {
 
 Review comment:
   fwiw: "beam", "flink", "metrics", "file", and "reporter" are already in this 
class' name: `org.apache.beam.runners.flink.metrics.FileReporter` (and users 
would use this FQN in `flink-conf.yaml`, as in the example above).
   
   I generally dislike the Java convention of repeating package-name segments 
in class' basenames, but recognize that it's common and people seem to like it, 
so happy to add here, just wanted to float the idea that 
`org.apache.beam.runners.flink.metrics.BeamFlinkFileMetricReporter` is 
redundant.
   
   lmk if you still prefer it and I'll change it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172340)
Time Spent: 5.5h  (was: 5h 20m)

> Send metrics to Flink in portable Flink runner
> --
>
> Key: BEAM-6165
> URL: https://issues.apache.org/jira/browse/BEAM-6165
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Ryan Williams
>Assignee: Ryan Williams
>Priority: Major
>  Labels: metrics, portability, portability-flink
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runner in the Python SDK (and likely 
> Java soon), but the portable Flink runner doesn't pass them on to Flink, 
> which it should, so that users can see them in e.g. the Flink UI or via any 
> Flink metrics reporters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5419) Build multiple versions of the Flink Runner against different Flink versions

2018-12-05 Thread Jeroen Steggink (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710251#comment-16710251
 ] 

Jeroen Steggink commented on BEAM-5419:
---

[~mxm], that's great! I'm interested in 1.6.x and 1.7.x. Different projects, 
with different needs.

> Build multiple versions of the Flink Runner against different Flink versions
> 
>
> Key: BEAM-5419
> URL: https://issues.apache.org/jira/browse/BEAM-5419
> Project: Beam
>  Issue Type: New Feature
>  Components: build-system, runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0
>
>
> Following up on a discussion on the mailing list.
> We want to keep the Flink version stable across different versions to avoid 
> upgrade pain for long-term users. At the same time, there are users out there 
> with newer Flink clusters and developers also want to utilize new Flink 
> features.
> It would be great to build multiple versions of the Flink Runner against 
> different Flink versions.
> When the upgrade is as simple as changing the version property in the build 
> script, this should be pretty straight-forward. If not, having a "base 
> version" and applying a patch during the build could be an option. We should 
> avoid duplicating any Runner code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172329=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172329
 ]

ASF GitHub Bot logged work on BEAM-6165:


Author: ASF GitHub Bot
Created on: 05/Dec/18 15:44
Start Date: 05/Dec/18 15:44
Worklog Time Spent: 10m 
  Work Description: ryan-williams commented on a change in pull request 
#7183: [BEAM-6165] send metrics to Flink in portable Flink runner
URL: https://github.com/apache/beam/pull/7183#discussion_r238770025
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+/**
+ * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics 
reporter} for writing
+ * metrics to a file (specified via the "metrics.reporter.test.file" config 
key).
+ */
+public class FileReporter extends AbstractReporter {
 
 Review comment:
   > it's configured via a test property
   
   ah, [I left the reporter alias as `test` in 
`flink_runner_test`](https://github.com/apache/beam/pull/7183/files#diff-195e4a1918aec54f6245e49081cadb5aR99)
 when I renamed this class to remove the `Test` prefix. I should change that 
alias to `file`, either way.
   
   *(update: I did this, and also left [a placeholder in the `FileReporter` 
javadoc](https://github.com/apache/beam/pull/7183/files#diff-4b3942dfc66918e7fd06a1ed781f0442R30)
 for whatever alias the user wants to use as the name of their `FileReporter`)*
   
   This is a reporter that writes the final state of all metrics to a file of 
the user's choosing. It can do this in test or production environments. It 
doesn't perform any assertions, or otherwise do things that only make sense in 
a testing context. It is fairly similar to 
[Slf4jReporter](https://github.com/apache/flink/blob/release-1.5.5/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporter.java#L41),
 but its output is easier to parse (for machines or humans), and would normally 
be configured like:
   
   ```
   metrics.reporters: file,
   metrics.reporter.file.class: 
org.apache.beam.runners.flink.metrics.FileReporter
   metrics.reporter.file.file: /some/path/on/job-mgr/node
   ```
   
   To me, it feels weirder to name it `Test*` and leave it under `main/`, when 
it is not really *that* test-specific an abstraction.
   
   However I understand if the possible non-test uses feel contrived, and 
perhaps having `Test*` names under `main/` isn't too smelly.
   
   I just want to make sure the vestigial `metrics.reporter.test.*` namespacing 
isn't driving the decision too heavily.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172329)
Time Spent: 5h 20m  (was: 5h 10m)

> Send metrics to Flink in portable Flink runner
> --
>
> Key: BEAM-6165
> URL: https://issues.apache.org/jira/browse/BEAM-6165
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Ryan Williams
>Assignee: Ryan Williams
>Priority: Major
>  Labels: metrics, portability, portability-flink
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runner in the Python SDK (and likely 
> Java soon), but the 

[jira] [Commented] (BEAM-5419) Build multiple versions of the Flink Runner against different Flink versions

2018-12-05 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16710157#comment-16710157
 ] 

Maximilian Michels commented on BEAM-5419:
--

Since this gets asked for more and more, I will prioritize this for 2.10.0. 
With Flink 1.7 out, we should provide at least builds for 1.7 and 1.6, and most 
likely also 1.5 because supporting it is about the same effort as supporting 
1.6.

[~jeroens] Are you interested in 1.6 or 1.7?

> Build multiple versions of the Flink Runner against different Flink versions
> 
>
> Key: BEAM-5419
> URL: https://issues.apache.org/jira/browse/BEAM-5419
> Project: Beam
>  Issue Type: New Feature
>  Components: build-system, runner-flink
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Major
> Fix For: 2.10.0
>
>
> Following up on a discussion on the mailing list.
> We want to keep the Flink version stable across different versions to avoid 
> upgrade pain for long-term users. At the same time, there are users out there 
> with newer Flink clusters and developers also want to utilize new Flink 
> features.
> It would be great to build multiple versions of the Flink Runner against 
> different Flink versions.
> When the upgrade is as simple as changing the version property in the build 
> script, this should be pretty straight-forward. If not, having a "base 
> version" and applying a patch during the build could be an option. We should 
> avoid duplicating any Runner code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6172) Flink metrics are not generated in standard format

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6172?focusedWorklogId=172310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172310
 ]

ASF GitHub Bot logged work on BEAM-6172:


Author: ASF GitHub Bot
Created on: 05/Dec/18 15:02
Start Date: 05/Dec/18 15:02
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7207: [BEAM-6172] Adjust Flink 
metric names / Add metric reporting tests
URL: https://github.com/apache/beam/pull/7207#issuecomment-444515725
 
 
   CC @ryan-williams 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172310)
Time Spent: 20m  (was: 10m)

> Flink metrics are not generated in standard format
> --
>
> Key: BEAM-6172
> URL: https://issues.apache.org/jira/browse/BEAM-6172
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Micah Wylde
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The metrics that the flink runner exports do not follow the standard format 
> used by Flink, and doesn't respect Flink metric configuration options. 
> For example (with the default metrics configuration) beam produces a metric:
> {code}
> 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.group.0.__counter__group__org-apache-beam-runners-core-ReduceFnRunner__droppedDueToClosedWindow
> {code}
> whereas a native Flink metric looks like:
> {code}
> 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.Source-Custom-Source-7Kinesis-None-beam-env-docker-v1-0-ToKeyedWorkItem.0.numRecordsOut
> {code}
> In particular, Beam should respect the 
> [metric.scope.delimiter|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#metrics-scope-delimiter]
>  configuration for separating components of a metric (currently it uses 
> "__"), and should not include the type of metric (counter, gauge, etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172301=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172301
 ]

ASF GitHub Bot logged work on BEAM-6167:


Author: ASF GitHub Bot
Created on: 05/Dec/18 14:50
Start Date: 05/Dec/18 14:50
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7193: 
[BEAM-6167] Add class ReadFromTextWithFilename (Python)
URL: https://github.com/apache/beam/pull/7193#discussion_r239095304
 
 

 ##
 File path: sdks/python/apache_beam/io/textio.py
 ##
 @@ -527,6 +533,61 @@ def expand(self, pvalue):
 return pvalue.pipeline | Read(self._source)
 
 
+class ReadFromTextWithFilename(PTransform):
 
 Review comment:
   Yes, I agree with that sentiment. The disadvantage I see here is that there 
will have to be an active effort to keep the (non-trivial) set of arguments in 
sync between the two from now on. 
   
   Another option would be to have a _ReadFromTextBase with two subclasses that 
differ only in a class attribute set to either _TextSource or 
_FilenameTextSource. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172301)
Time Spent: 40m  (was: 0.5h)

> Create a Class to read content of a file keeping track of the file path 
> (python)
> 
>
> Key: BEAM-6167
> URL: https://issues.apache.org/jira/browse/BEAM-6167
> Project: Beam
>  Issue Type: Improvement
>  Components: io-ideas
>Affects Versions: 2.8.0
>Reporter: Lorenzo Caggioni
>Assignee: Eugene Kirpichov
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Add a class to read content of a file keeping track of the file path each 
> element come from.
> This is an improvement of the current python/apache_beam/io/textio.py



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172296=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172296
 ]

ASF GitHub Bot logged work on BEAM-6167:


Author: ASF GitHub Bot
Created on: 05/Dec/18 14:35
Start Date: 05/Dec/18 14:35
Worklog Time Spent: 10m 
  Work Description: lcaggio commented on a change in pull request #7193: 
[BEAM-6167] Add class ReadFromTextWithFilename (Python)
URL: https://github.com/apache/beam/pull/7193#discussion_r239089178
 
 

 ##
 File path: sdks/python/apache_beam/io/textio.py
 ##
 @@ -527,6 +533,61 @@ def expand(self, pvalue):
 return pvalue.pipeline | Read(self._source)
 
 
+class ReadFromTextWithFilename(PTransform):
 
 Review comment:
   This is definitely an option. I implemented it in this way because I found 
strage have a class that change the returned data type format based on an 
optional parameter. 
   
   In this way, it is more clear (and documented) that if I want to get the 
source filename, I will need to change the class **and** the way I handle 
elements in the following steps. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172296)
Time Spent: 0.5h  (was: 20m)

> Create a Class to read content of a file keeping track of the file path 
> (python)
> 
>
> Key: BEAM-6167
> URL: https://issues.apache.org/jira/browse/BEAM-6167
> Project: Beam
>  Issue Type: Improvement
>  Components: io-ideas
>Affects Versions: 2.8.0
>Reporter: Lorenzo Caggioni
>Assignee: Eugene Kirpichov
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Add a class to read content of a file keeping track of the file path each 
> element come from.
> This is an improvement of the current python/apache_beam/io/textio.py



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172287=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172287
 ]

ASF GitHub Bot logged work on BEAM-6167:


Author: ASF GitHub Bot
Created on: 05/Dec/18 13:59
Start Date: 05/Dec/18 13:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7193: 
[BEAM-6167] Add class ReadFromTextWithFilename (Python)
URL: https://github.com/apache/beam/pull/7193#discussion_r239072795
 
 

 ##
 File path: sdks/python/apache_beam/io/textio.py
 ##
 @@ -383,7 +389,7 @@ def open(self, temp_path):
 if self._header is not None:
   file_handle.write(self._header)
   if self._append_trailing_newlines:
-file_handle.write(b'\n')
+file_handle.write('\n')
 
 Review comment:
   I think these are required for Python 3. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172287)
Time Spent: 20m  (was: 10m)

> Create a Class to read content of a file keeping track of the file path 
> (python)
> 
>
> Key: BEAM-6167
> URL: https://issues.apache.org/jira/browse/BEAM-6167
> Project: Beam
>  Issue Type: Improvement
>  Components: io-ideas
>Affects Versions: 2.8.0
>Reporter: Lorenzo Caggioni
>Assignee: Eugene Kirpichov
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add a class to read content of a file keeping track of the file path each 
> element come from.
> This is an improvement of the current python/apache_beam/io/textio.py



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6167) Create a Class to read content of a file keeping track of the file path (python)

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6167?focusedWorklogId=172288=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172288
 ]

ASF GitHub Bot logged work on BEAM-6167:


Author: ASF GitHub Bot
Created on: 05/Dec/18 13:59
Start Date: 05/Dec/18 13:59
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7193: 
[BEAM-6167] Add class ReadFromTextWithFilename (Python)
URL: https://github.com/apache/beam/pull/7193#discussion_r239074727
 
 

 ##
 File path: sdks/python/apache_beam/io/textio.py
 ##
 @@ -527,6 +533,61 @@ def expand(self, pvalue):
 return pvalue.pipeline | Read(self._source)
 
 
+class ReadFromTextWithFilename(PTransform):
 
 Review comment:
   Perhaps it would make more sense to allow returning filenames be an option 
for all file-based sources, rather than a new transform? At the very least, 
much of the redundancy here could be removed. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172288)
Time Spent: 20m  (was: 10m)

> Create a Class to read content of a file keeping track of the file path 
> (python)
> 
>
> Key: BEAM-6167
> URL: https://issues.apache.org/jira/browse/BEAM-6167
> Project: Beam
>  Issue Type: Improvement
>  Components: io-ideas
>Affects Versions: 2.8.0
>Reporter: Lorenzo Caggioni
>Assignee: Eugene Kirpichov
>Priority: Minor
> Fix For: Not applicable
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add a class to read content of a file keeping track of the file path each 
> element come from.
> This is an improvement of the current python/apache_beam/io/textio.py



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-5859) Improve Traceability of Pipeline translation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-5859?focusedWorklogId=172279=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172279
 ]

ASF GitHub Bot logged work on BEAM-5859:


Author: ASF GitHub Bot
Created on: 05/Dec/18 13:53
Start Date: 05/Dec/18 13:53
Worklog Time Spent: 10m 
  Work Description: robertwb opened a new pull request #7208: [BEAM-5859] 
Better handle fused composite stage names.
URL: https://github.com/apache/beam/pull/7208
 
 
   I decided to default to truncated names because they both look
   nicer and they have more information if the UI is going to
   (implicitly or explicitly) truncate them on length anyways.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172279)
Time Spent: 2h 20m  (was: 2h 10m)

> Improve Traceability of 

[jira] [Work logged] (BEAM-6172) Flink metrics are not generated in standard format

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6172?focusedWorklogId=172269=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172269
 ]

ASF GitHub Bot logged work on BEAM-6172:


Author: ASF GitHub Bot
Created on: 05/Dec/18 13:20
Start Date: 05/Dec/18 13:20
Worklog Time Spent: 10m 
  Work Description: mxm opened a new pull request #7207: [BEAM-6172] Adjust 
Flink metric names / Add metric reporting tests
URL: https://github.com/apache/beam/pull/7207
 
 
   This picks up the metrics delimiter from the Flink config and removes the 
metric
   type prefix. This improves consistency with Flink's defaults and makes metric
   names more readable.
   
   R @mwylde
   CC @tweise 
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_GradleBuild/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Gradle/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_Gradle/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172269)
Time Spent: 10m
Remaining Estimate: 0h

> Flink metrics are not generated in standard format
> --
>
> Key: BEAM-6172
> URL: https://issues.apache.org/jira/browse/BEAM-6172
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Micah Wylde
>Assignee: Maximilian Michels
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The metrics that the flink runner exports do not follow the standard format 
> used by 

[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172247=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172247
 ]

ASF GitHub Bot logged work on BEAM-6094:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:48
Start Date: 05/Dec/18 11:48
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7078: 
[BEAM-6094] Implement external environment for portable BeamPython.
URL: https://github.com/apache/beam/pull/7078#discussion_r239030465
 
 

 ##
 File path: model/pipeline/src/main/proto/beam_runner_api.proto
 ##
 @@ -1045,6 +1046,11 @@ message ProcessPayload {
   map env = 4; // Environment variables
 }
 
+message ExternalPayload {
+  ApiServiceDescriptor endpoint = 1;
 
 Review comment:
   ApiServiceDescriptor has the same meaning as everything else. 
   
   Correct. I started going down the road of spec-ing out a full REST protocol 
here, but figured that was better deferred to future work, and this PR focuses 
on the more basic mechanisms while leaving the transport layer more extensible 
(either withing the ApiServiceDescriptor, if we decide to go that route, or 
along side it as a different endpoint type, in which case we could make this a 
oneof). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172247)
Time Spent: 1h 40m  (was: 1.5h)

> Implement External environment for Portable Beam
> 
>
> Key: BEAM-6094
> URL: https://issues.apache.org/jira/browse/BEAM-6094
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172252=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172252
 ]

ASF GitHub Bot logged work on BEAM-6094:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:48
Start Date: 05/Dec/18 11:48
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7078: 
[BEAM-6094] Implement external environment for portable BeamPython.
URL: https://github.com/apache/beam/pull/7078#discussion_r239031203
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link EnvironmentFactory} which forks processes based on the given URL 
in the Environment.
+ * The returned {@link ProcessEnvironment} has to make sure to stop the 
processes.
+ */
+public class ExternalEnvironmentFactory implements EnvironmentFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExternalEnvironmentFactory.class);
+
+  public static ExternalEnvironmentFactory create(
+  GrpcFnServer controlServiceServer,
+  GrpcFnServer loggingServiceServer,
+  GrpcFnServer retrievalServiceServer,
+  GrpcFnServer provisioningServiceServer,
+  ControlClientPool.Source clientSource,
+  IdGenerator idGenerator) {
+return new ExternalEnvironmentFactory(
+controlServiceServer,
+loggingServiceServer,
+retrievalServiceServer,
+provisioningServiceServer,
+idGenerator,
+clientSource);
+  }
+
+  private final GrpcFnServer 
controlServiceServer;
+  private final GrpcFnServer loggingServiceServer;
+  private final GrpcFnServer retrievalServiceServer;
+  private final GrpcFnServer 
provisioningServiceServer;
+  private final IdGenerator idGenerator;
+  private final ControlClientPool.Source clientSource;
+
+  private ExternalEnvironmentFactory(
+  GrpcFnServer controlServiceServer,
+  GrpcFnServer loggingServiceServer,
+  GrpcFnServer retrievalServiceServer,
+  GrpcFnServer provisioningServiceServer,
+  IdGenerator idGenerator,
+  ControlClientPool.Source clientSource) {
+this.controlServiceServer = controlServiceServer;
+this.loggingServiceServer = loggingServiceServer;
+this.retrievalServiceServer = retrievalServiceServer;
+this.provisioningServiceServer = provisioningServiceServer;
+this.idGenerator = idGenerator;
+this.clientSource = clientSource;
+  }
+
+  /** Creates a new, active {@link RemoteEnvironment} backed by a forked 
process. */
+  @Override
+  public RemoteEnvironment createEnvironment(Environment environment) throws 
Exception {
+Preconditions.checkState(
+environment
+.getUrn()
+
.equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.EXTERNAL)),
+

[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172250=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172250
 ]

ASF GitHub Bot logged work on BEAM-6094:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:48
Start Date: 05/Dec/18 11:48
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7078: 
[BEAM-6094] Implement external environment for portable BeamPython.
URL: https://github.com/apache/beam/pull/7078#discussion_r239032814
 
 

 ##
 File path: 
sdks/python/apache_beam/runners/portability/local_job_service_main.py
 ##
 @@ -32,10 +32,8 @@ def run(argv):
   parser.add_argument('-p', '--port',
   type=int,
   help='port on which to serve the job api')
-  parser.add_argument('--worker_command_line',
-  help='command line for starting up a worker process')
   options = parser.parse_args(argv)
-  job_servicer = 
local_job_service.LocalJobServicer(options.worker_command_line)
+  job_servicer = local_job_service.LocalJobServicer()
 
 Review comment:
   No, this is now passed via an environment rather than out-of-band.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172250)
Time Spent: 1h 50m  (was: 1h 40m)

> Implement External environment for Portable Beam
> 
>
> Key: BEAM-6094
> URL: https://issues.apache.org/jira/browse/BEAM-6094
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172251=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172251
 ]

ASF GitHub Bot logged work on BEAM-6094:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:48
Start Date: 05/Dec/18 11:48
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7078: 
[BEAM-6094] Implement external environment for portable BeamPython.
URL: https://github.com/apache/beam/pull/7078#discussion_r239031615
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ExternalEnvironmentFactory.java
 ##
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import com.google.common.base.Preconditions;
+import java.time.Duration;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerGrpc;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import 
org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link EnvironmentFactory} which forks processes based on the given URL 
in the Environment.
+ * The returned {@link ProcessEnvironment} has to make sure to stop the 
processes.
+ */
+public class ExternalEnvironmentFactory implements EnvironmentFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExternalEnvironmentFactory.class);
+
+  public static ExternalEnvironmentFactory create(
+  GrpcFnServer controlServiceServer,
+  GrpcFnServer loggingServiceServer,
+  GrpcFnServer retrievalServiceServer,
+  GrpcFnServer provisioningServiceServer,
+  ControlClientPool.Source clientSource,
+  IdGenerator idGenerator) {
+return new ExternalEnvironmentFactory(
+controlServiceServer,
+loggingServiceServer,
+retrievalServiceServer,
+provisioningServiceServer,
+idGenerator,
+clientSource);
+  }
+
+  private final GrpcFnServer 
controlServiceServer;
+  private final GrpcFnServer loggingServiceServer;
+  private final GrpcFnServer retrievalServiceServer;
+  private final GrpcFnServer 
provisioningServiceServer;
+  private final IdGenerator idGenerator;
+  private final ControlClientPool.Source clientSource;
+
+  private ExternalEnvironmentFactory(
+  GrpcFnServer controlServiceServer,
+  GrpcFnServer loggingServiceServer,
+  GrpcFnServer retrievalServiceServer,
+  GrpcFnServer provisioningServiceServer,
+  IdGenerator idGenerator,
+  ControlClientPool.Source clientSource) {
+this.controlServiceServer = controlServiceServer;
+this.loggingServiceServer = loggingServiceServer;
+this.retrievalServiceServer = retrievalServiceServer;
+this.provisioningServiceServer = provisioningServiceServer;
+this.idGenerator = idGenerator;
+this.clientSource = clientSource;
+  }
+
+  /** Creates a new, active {@link RemoteEnvironment} backed by a forked 
process. */
+  @Override
+  public RemoteEnvironment createEnvironment(Environment environment) throws 
Exception {
+Preconditions.checkState(
+environment
+.getUrn()
+
.equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.EXTERNAL)),
+

[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172249=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172249
 ]

ASF GitHub Bot logged work on BEAM-6094:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:48
Start Date: 05/Dec/18 11:48
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7078: 
[BEAM-6094] Implement external environment for portable BeamPython.
URL: https://github.com/apache/beam/pull/7078#discussion_r239033306
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/portable_runner.py
 ##
 @@ -118,10 +151,12 @@ def run_pipeline(self, pipeline):
   docker = DockerizedJobServer()
   job_endpoint = docker.start()
 
-proto_context = pipeline_context.PipelineContext(
+# This is needed as we start a worker server if one is requested
+# but none is provided.
+cleanup_callbacks = []
 
 Review comment:
   I cleaned this up providing a new LOOPBACK environment rather than have 
external implicitly create workers. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172249)

> Implement External environment for Portable Beam
> 
>
> Key: BEAM-6094
> URL: https://issues.apache.org/jira/browse/BEAM-6094
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172248=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172248
 ]

ASF GitHub Bot logged work on BEAM-6094:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:48
Start Date: 05/Dec/18 11:48
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7078: 
[BEAM-6094] Implement external environment for portable BeamPython.
URL: https://github.com/apache/beam/pull/7078#discussion_r239032505
 
 

 ##
 File path: sdks/python/apache_beam/pipeline.py
 ##
 @@ -606,13 +606,15 @@ def visit_value(self, value, _):
 return Visitor.ok
 
   def to_runner_api(
-  self, return_context=False, context=None, use_fake_coders=False):
+  self, return_context=False, context=None, use_fake_coders=False,
+  default_environment=None):
 """For internal use only; no backwards-compatibility guarantees."""
 from apache_beam.runners import pipeline_context
 from apache_beam.portability.api import beam_runner_api_pb2
 if context is None:
   context = pipeline_context.PipelineContext(
-  use_fake_coders=use_fake_coders)
+  use_fake_coders=use_fake_coders,
+  default_environment=default_environment)
 
 Review comment:
   It's up to the context to come up with a default environment if none is set. 
The argument here just overrides that default. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172248)
Time Spent: 1h 40m  (was: 1.5h)

> Implement External environment for Portable Beam
> 
>
> Key: BEAM-6094
> URL: https://issues.apache.org/jira/browse/BEAM-6094
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6094) Implement External environment for Portable Beam

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6094?focusedWorklogId=172241=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172241
 ]

ASF GitHub Bot logged work on BEAM-6094:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:32
Start Date: 05/Dec/18 11:32
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7078: 
[BEAM-6094] Implement external environment for portable BeamPython.
URL: https://github.com/apache/beam/pull/7078#discussion_r239029555
 
 

 ##
 File path: model/fn-execution/src/main/proto/beam_fn_api.proto
 ##
 @@ -967,3 +967,23 @@ service BeamFnLogging {
 stream LogControl
   ) {}
 }
+
+
+message StartWorkerRequest {
+  string worker_id = 1;
+  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor control_endpoint = 2;
+  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor logging_endpoint = 3;
+  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor artifact_endpoint = 4;
+  org.apache.beam.model.pipeline.v1.ApiServiceDescriptor provision_endpoint = 
5;
+  map params = 10;
+}
+
+message StartWorkerResponse {
+  string error = 1;
+}
+
+service BeamFnExternalWorker {
 
 Review comment:
   Correct, the lifecycle is unmanaged. Can you think of a better name for 
this? AllocateWorker? InformWorker??


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172241)
Time Spent: 1.5h  (was: 1h 20m)

> Implement External environment for Portable Beam
> 
>
> Key: BEAM-6094
> URL: https://issues.apache.org/jira/browse/BEAM-6094
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6067) Dataflow runner should include portable pipeline coder id in CloudObject coder representation

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6067?focusedWorklogId=172234=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172234
 ]

ASF GitHub Bot logged work on BEAM-6067:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:27
Start Date: 05/Dec/18 11:27
Worklog Time Spent: 10m 
  Work Description: robertwb commented on issue #7081: [BEAM-6067] In 
Python SDK, specify pipeline_proto_coder_id property in non-Beam-standard 
CloudObject coders
URL: https://github.com/apache/beam/pull/7081#issuecomment-53189
 
 
   This looks OK to me. (As an aside, I wonder why the transform nodes 
themselves don't have a reference to the pipeline...) I resolved the merge 
conflict, and will merge assuming all tests passing. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172234)
Time Spent: 7h  (was: 6h 50m)
Remaining Estimate: 161h  (was: 161h 10m)

> Dataflow runner should include portable pipeline coder id in CloudObject 
> coder representation
> -
>
> Key: BEAM-6067
> URL: https://issues.apache.org/jira/browse/BEAM-6067
> Project: Beam
>  Issue Type: Improvement
>  Components: beam-model
>Reporter: Craig Chambers
>Assignee: Craig Chambers
>Priority: Major
>   Original Estimate: 168h
>  Time Spent: 7h
>  Remaining Estimate: 161h
>
> When translating a BeamJava Coder into the DataflowRunner's CloudObject 
> property map, include a property that specifies the id in the Beam model 
> Pipeline coders map corresponding to that Coder.  This will allow the 
> DataflowRunner to reference the corresponding Beam coder in the FnAPI 
> processing bundle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6153) PR 7130 causes transforms/util_test to fail

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6153?focusedWorklogId=172227=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172227
 ]

ASF GitHub Bot logged work on BEAM-6153:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:04
Start Date: 05/Dec/18 11:04
Worklog Time Spent: 10m 
  Work Description: robertwb closed pull request #7170: [BEAM-6153] 
Re-enable coder optimization.
URL: https://github.com/apache/beam/pull/7170
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd 
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 03d0a56c053e..db724e652f83 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -25,11 +25,14 @@ cimport libc.stdint
 cimport libc.stdlib
 cimport libc.string
 
+cdef extern from "math.h":
+  libc.stdint.int64_t abs "llabs"(libc.stdint.int64_t)
+
 from .stream cimport InputStream, OutputStream
 from apache_beam.utils cimport windowed_value
 
 
-cdef object loads, dumps, create_InputStream, create_OutputStream, 
ByteCountingOutputStream, get_varint_size
+cdef object loads, dumps, create_InputStream, create_OutputStream, 
ByteCountingOutputStream, get_varint_size, past_unicode
 # Temporarily untyped to allow monkeypatching on failed import.
 #cdef type WindowedValue
 
@@ -75,8 +78,11 @@ cdef unsigned char SET_TYPE
 
 cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
   cdef CoderImpl fallback_coder_impl
-  @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t)
+  @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
+ unicode_value=unicode)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+  @cython.locals(t=int)
+  cpdef decode_from_stream(self, InputStream stream, bint nested)
 
 
 cdef class BytesCoderImpl(CoderImpl):
@@ -123,6 +129,9 @@ cdef class TupleCoderImpl(AbstractComponentCoderImpl):
 cdef class SequenceCoderImpl(StreamCoderImpl):
   cdef CoderImpl _elem_coder
   cpdef _construct_from_sequence(self, values)
+  @cython.locals(buffer=OutputStream, target_buffer_size=libc.stdint.int64_t,
+ index=libc.stdint.int64_t)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
 
 
 cdef class TupleSequenceCoderImpl(SequenceCoderImpl):
@@ -133,8 +142,41 @@ cdef class IterableCoderImpl(SequenceCoderImpl):
   pass
 
 
+cdef object IntervalWindow
+
+cdef class IntervalWindowCoderImpl(StreamCoderImpl):
+  cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
+  cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
+
+  @cython.locals(typed_value=windowed_value._IntervalWindowBase,
+ span_millis=libc.stdint.int64_t)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+
+  @cython.locals(typed_value=windowed_value._IntervalWindowBase)
+  cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+  @cython.locals(typed_value=windowed_value._IntervalWindowBase,
+ span_millis=libc.stdint.int64_t)
+  cpdef estimate_size(self, value, bint nested=?)
+
+
+cdef int PaneInfoTiming_UNKNOWN
+cdef int PaneInfoEncoding_FIRST
+
+
 cdef class PaneInfoCoderImpl(StreamCoderImpl):
-  cdef int _choose_encoding(self, value)
+  cdef int _choose_encoding(self, windowed_value.PaneInfo value)
+
+  @cython.locals(pane_info=windowed_value.PaneInfo, encoding_type=int)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+
+  @cython.locals(encoded_first_byte=int, encoding_type=int)
+  cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+
+cdef libc.stdint.uint64_t _TIME_SHIFT
+cdef libc.stdint.int64_t MIN_TIMESTAMP_micros
+cdef libc.stdint.int64_t MAX_TIMESTAMP_micros
 
 
 cdef class WindowedValueCoderImpl(StreamCoderImpl):
@@ -144,8 +186,18 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
   cdef CoderImpl _windows_coder
   cdef CoderImpl _pane_info_coder
 
+  cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
+  cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
+
   @cython.locals(c=CoderImpl)
   cpdef get_estimated_size_and_observables(self, value, bint nested=?)
 
-  @cython.locals(wv=windowed_value.WindowedValue)
+  @cython.locals(timestamp=libc.stdint.int64_t)
+  cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+  @cython.locals(wv=windowed_value.WindowedValue, restore_sign=int)
   cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+
+
+cdef class LengthPrefixCoderImpl(StreamCoderImpl):
+  cdef CoderImpl _value_coder
diff --git 

[jira] [Work logged] (BEAM-6120) Support retrieval of large gbk iterables over the state API.

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6120?focusedWorklogId=172229=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172229
 ]

ASF GitHub Bot logged work on BEAM-6120:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:12
Start Date: 05/Dec/18 11:12
Worklog Time Spent: 10m 
  Work Description: robertwb commented on a change in pull request #7127: 
[BEAM-6120] Support retrieval of large gbk iterables over the state API.
URL: https://github.com/apache/beam/pull/7127#discussion_r239023763
 
 

 ##
 File path: model/pipeline/src/main/proto/beam_runner_api.proto
 ##
 @@ -588,6 +588,10 @@ message StandardCoders {
 // of the element
 // Components: The element coder and the window coder, in that order
 WINDOWED_VALUE = 8 [(beam_urn) = "beam:coder:windowed_value:v1"];
+
+// Encodes an iterable of elements, some of which may be stored in state.
+// Components: Coder for a single element.
+STATE_BACKED_ITERABLE = 9 [(beam_urn) = 
"beam:coder:state_backed_iterable:v1"];
 
 Review comment:
   As per the discussion on the list, more general support for large values is 
desirable but not near as straightforward. I did go ahead and mark this coder 
as experimental. PTAL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172229)
Time Spent: 2h 40m  (was: 2.5h)

> Support retrieval of large gbk iterables over the state API.
> 
>
> Key: BEAM-6120
> URL: https://issues.apache.org/jira/browse/BEAM-6120
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-harness
>Reporter: Robert Bradshaw
>Assignee: Robert Bradshaw
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6165) Send metrics to Flink in portable Flink runner

2018-12-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6165?focusedWorklogId=172228=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172228
 ]

ASF GitHub Bot logged work on BEAM-6165:


Author: ASF GitHub Bot
Created on: 05/Dec/18 11:11
Start Date: 05/Dec/18 11:11
Worklog Time Spent: 10m 
  Work Description: mxm commented on a change in pull request #7183: 
[BEAM-6165] send metrics to Flink in portable Flink runner
URL: https://github.com/apache/beam/pull/7183#discussion_r239023535
 
 

 ##
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FileReporter.java
 ##
 @@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.metrics;
+
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+
+/**
+ * Flink {@link org.apache.flink.metrics.reporter.MetricReporter metrics 
reporter} for writing
+ * metrics to a file (specified via the "metrics.reporter.test.file" config 
key).
+ */
+public class FileReporter extends AbstractReporter {
 
 Review comment:
   I suppose it could come in handy when you just want to dump metrics locally.
   
   Since package is `org.apache.beam.runners.flink.metrics`, `FileReporter` 
seems fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 172228)
Time Spent: 5h 10m  (was: 5h)

> Send metrics to Flink in portable Flink runner
> --
>
> Key: BEAM-6165
> URL: https://issues.apache.org/jira/browse/BEAM-6165
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Ryan Williams
>Assignee: Ryan Williams
>Priority: Major
>  Labels: metrics, portability, portability-flink
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Metrics are sent from the fn harness to runner in the Python SDK (and likely 
> Java soon), but the portable Flink runner doesn't pass them on to Flink, 
> which it should, so that users can see them in e.g. the Flink UI or via any 
> Flink metrics reporters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-6172) Flink metrics are not generated in standard format

2018-12-05 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels reassigned BEAM-6172:


Assignee: Maximilian Michels

> Flink metrics are not generated in standard format
> --
>
> Key: BEAM-6172
> URL: https://issues.apache.org/jira/browse/BEAM-6172
> Project: Beam
>  Issue Type: Improvement
>  Components: runner-flink
>Affects Versions: 2.8.0
>Reporter: Micah Wylde
>Assignee: Maximilian Michels
>Priority: Minor
>
> The metrics that the flink runner exports do not follow the standard format 
> used by Flink, and doesn't respect Flink metric configuration options. 
> For example (with the default metrics configuration) beam produces a metric:
> {code}
> 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.group.0.__counter__group__org-apache-beam-runners-core-ReduceFnRunner__droppedDueToClosedWindow
> {code}
> whereas a native Flink metric looks like:
> {code}
> 10-100-209-71.taskmanager.0f29b420b63fea58f6f321bc0cbf45f3.BeamApp-mwylde-1203224439-a7d8fdf6.Source-Custom-Source-7Kinesis-None-beam-env-docker-v1-0-ToKeyedWorkItem.0.numRecordsOut
> {code}
> In particular, Beam should respect the 
> [metric.scope.delimiter|https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#metrics-scope-delimiter]
>  configuration for separating components of a metric (currently it uses 
> "__"), and should not include the type of metric (counter, gauge, etc.).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >