[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=189144=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189144
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 23/Jan/19 19:08
Start Date: 23/Jan/19 19:08
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 189144)
Time Spent: 5h 50m  (was: 5h 40m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-23 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=189143=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-189143
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 23/Jan/19 19:08
Start Date: 23/Jan/19 19:08
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-456927471
 
 
   I am going to merge the changes in since we don't want to hold this for 
long. Feel free to add comments anyway and we will address them in future 
patches.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 189143)
Time Spent: 5h 40m  (was: 5.5h)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-22 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=188647=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-188647
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 23/Jan/19 04:13
Start Date: 23/Jan/19 04:13
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-456663737
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 188647)
Time Spent: 5.5h  (was: 5h 20m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187132=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187132
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 23:01
Start Date: 18/Jan/19 23:01
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-455715619
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 187132)
Time Spent: 5h 20m  (was: 5h 10m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187038=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187038
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 21:14
Start Date: 18/Jan/19 21:14
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-455689548
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 187038)
Time Spent: 5h 10m  (was: 5h)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187012=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187012
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 19:41
Start Date: 18/Jan/19 19:41
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-455664380
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 187012)
Time Spent: 5h  (was: 4h 50m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187011=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187011
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 19:39
Start Date: 18/Jan/19 19:39
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321
 
 
   initial support for portable api in samza runner, including job server 
inside samza runner, config translation for portable pipeline, transform 
translation for portable pipeline, refactor of existing codes to merge logic of 
portable api and java api as much as possible, etc.
   
   The change doesn't yet include effort to support
   - batching the events transferred between java runner and sdk workers 
(bundling)
   - multiple sdk workers connecting to one java runner
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187009
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 19:37
Start Date: 18/Jan/19 19:37
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 187009)
Time Spent: 4h 40m  (was: 4.5h)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=187004=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-187004
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 19:12
Start Date: 18/Jan/19 19:12
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r249155184
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
 this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer fnServer) {
+try (AutoCloseable closer = fnServer) {
+  // do nothing
+} catch (Exception e) {
+  LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+}
+  }
+
+  private void setUpContextManager(
+  StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+streamGraph.withContextManager(
+new ContextManager() {
+  @Override
+  public void init(Config config, TaskContext context) {
+if (executionContext.getMetricsContainer() == null) {
+  final MetricsRegistryMap metricsRegistry =
+  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+  executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+}
+
+if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+  if (jobBundleFactory == null) {
 
 Review comment:
   Hmm, this would tie to the work to test "multiple sdk workers connecting to 
one java runner" as well as replacing 
"SingleEnvironmentInstanceJobBundleFactory". I think it's better to group those 
effort together
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 187004)
Time Spent: 4.5h  (was: 4h 20m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186984=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186984
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 18:11
Start Date: 18/Jan/19 18:11
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r249136978
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
 ##
 @@ -66,21 +68,48 @@ public void translate(
 }
 
 if (inputStreams.size() == 1) {
-  ctx.registerMessageStream(output, inputStreams.get(0));
+  ctx.registerMessageStream(output, 
Iterables.getOnlyElement(inputStreams));
   return;
 }
 
+ctx.registerMessageStream(output, mergeInputStreams(inputStreams));
+  }
+
+  @Override
+  public void translatePortable(
+  PipelineNode.PTransformNode transform,
+  QueryablePipeline pipeline,
+  PortableTranslationContext ctx) {
+final List>> inputStreams = 
ctx.getAllInputMessageStreams(transform);
+final String outputId = ctx.getOutputId(transform);
+
+if (inputStreams.isEmpty()) {
+  // For portable api there should be at least the impulse as a dummy input
+  // We will know once validateRunner tests are available for portable 
runners
+  throw new IllegalArgumentException(
+  String.format("no input streams defined for Flatten: %s", 
transform.getId()));
+}
+
+if (inputStreams.size() == 1) {
 
 Review comment:
   good point
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186984)
Time Spent: 4h 20m  (was: 4h 10m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-18 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186983=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186983
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 18:07
Start Date: 18/Jan/19 18:07
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r249135944
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
 this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer fnServer) {
+try (AutoCloseable closer = fnServer) {
+  // do nothing
+} catch (Exception e) {
+  LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+}
+  }
+
+  private void setUpContextManager(
+  StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+streamGraph.withContextManager(
+new ContextManager() {
+  @Override
+  public void init(Config config, TaskContext context) {
+if (executionContext.getMetricsContainer() == null) {
+  final MetricsRegistryMap metricsRegistry =
+  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+  executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+}
+
+if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+  if (jobBundleFactory == null) {
+try {
+  final long waitTimeoutMs =
+  
SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+  final InstructionRequestHandler instructionHandler =
+  controlClientPool
+  .getSource()
+  .take(SAMZA_WORKER_ID, 
Duration.ofMillis(waitTimeoutMs));
+  final EnvironmentFactory environmentFactory =
+  environment -> RemoteEnvironment.forHandler(environment, 
instructionHandler);
+  // TODO: use 
JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
+  jobBundleFactory =
+  SingleEnvironmentInstanceJobBundleFactory.create(
+  environmentFactory, fnDataServer, fnStateServer);
+} catch (Exception e) {
+  throw new RuntimeException(
+  "Running samza in Beam portable mode but failed to 
create job bundle factory",
+  e);
+}
+executionContext.setJobBundleFactory(jobBundleFactory);
+  }
+}
+
+context.setUserContext(executionContext);
+  }
+
+  @Override
+  public void close() {
+closeFnServer(fnControlServer);
+fnControlServer = null;
+closeFnServer(fnDataServer);
+fnDataServer = null;
+closeFnServer(fnStateServer);
+fnStateServer = null;
+  }
+});
+  }
+
+  private void setUpFnApiServer() {
+controlClientPool = MapControlClientPool.create();
+ExecutorService dataExecutor = Executors.newCachedThreadPool();
+try {
+  fnControlServer =
+  GrpcFnServer.allocatePortAndCreateFor(
+  FnApiControlClientPoolService.offeringClientsToPool(
+  controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
+  ServerFactory.createWithPortSupplier(
+  () -> SamzaRunnerOverrideConfigs.getFnControlPort(options)));
+
+  fnDataServer =
+  GrpcFnServer.allocatePortAndCreateFor(
+  GrpcDataService.create(dataExecutor, 
OutboundObserverFactory.serverDirect()),
+  ServerFactory.createDefault());
+
+  fnStateServer =
+  GrpcFnServer.allocatePortAndCreateFor(
+  GrpcStateService.create(), ServerFactory.createDefault());
+} catch (Exception e) {
+  LOG.error("Failed to set up fn api servers", e);
+  throw new RuntimeException(e);
+}
+  }
+
+  SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
+final SamzaExecutionContext executionContext = new SamzaExecutionContext();
+setUpFnApiServer();
 
 Review comment:
   As discussed, put a TODO here as it's already done in 1.0 migrate
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186669=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186669
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 02:14
Start Date: 18/Jan/19 02:14
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248908054
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
 this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer fnServer) {
+try (AutoCloseable closer = fnServer) {
+  // do nothing
+} catch (Exception e) {
+  LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+}
+  }
+
+  private void setUpContextManager(
+  StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+streamGraph.withContextManager(
+new ContextManager() {
+  @Override
+  public void init(Config config, TaskContext context) {
+if (executionContext.getMetricsContainer() == null) {
+  final MetricsRegistryMap metricsRegistry =
+  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+  executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+}
+
+if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+  if (jobBundleFactory == null) {
+try {
+  final long waitTimeoutMs =
+  
SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+  final InstructionRequestHandler instructionHandler =
+  controlClientPool
+  .getSource()
+  .take(SAMZA_WORKER_ID, 
Duration.ofMillis(waitTimeoutMs));
+  final EnvironmentFactory environmentFactory =
+  environment -> RemoteEnvironment.forHandler(environment, 
instructionHandler);
+  // TODO: use 
JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
+  jobBundleFactory =
+  SingleEnvironmentInstanceJobBundleFactory.create(
+  environmentFactory, fnDataServer, fnStateServer);
+} catch (Exception e) {
+  throw new RuntimeException(
+  "Running samza in Beam portable mode but failed to 
create job bundle factory",
+  e);
+}
+executionContext.setJobBundleFactory(jobBundleFactory);
+  }
+}
+
+context.setUserContext(executionContext);
+  }
+
+  @Override
+  public void close() {
+closeFnServer(fnControlServer);
+fnControlServer = null;
+closeFnServer(fnDataServer);
+fnDataServer = null;
+closeFnServer(fnStateServer);
+fnStateServer = null;
+  }
+});
+  }
+
+  private void setUpFnApiServer() {
+controlClientPool = MapControlClientPool.create();
 
 Review comment:
   the pool doesn't have a way to close. So setting the variable to null is the 
best we could do. Do need to shutdown the executorService though. Good point
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186669)
Time Spent: 4h  (was: 3h 50m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186663=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186663
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 02:04
Start Date: 18/Jan/19 02:04
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248906805
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Driver program that starts a job server. */
+public class SamzaJobServerDriver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobServerDriver.class);
+
+  private final ServerConfiguration config;
+
+  /** Configuration for the jobServer. */
+  private static class ServerConfiguration {
+@Option(name = "--job-port", usage = "The job service port. (Default: 
11440)")
+private int jobPort = 11440;
+
+@Option(name = "--control-port", usage = "The FnControl port. (Default: 
11441)")
+private int controlPort = 11441;
+  }
+
+  private SamzaJobServerDriver(ServerConfiguration config) {
+this.config = config;
+  }
+
+  public static void main(String[] args) throws Exception {
+ServerConfiguration configuration = new ServerConfiguration();
 
 Review comment:
   done
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186663)
Time Spent: 3.5h  (was: 3h 20m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186665
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 02:08
Start Date: 18/Jan/19 02:08
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248907312
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Driver program that starts a job server. */
+public class SamzaJobServerDriver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobServerDriver.class);
+
+  private final ServerConfiguration config;
+
+  /** Configuration for the jobServer. */
+  private static class ServerConfiguration {
+@Option(name = "--job-port", usage = "The job service port. (Default: 
11440)")
+private int jobPort = 11440;
+
+@Option(name = "--control-port", usage = "The FnControl port. (Default: 
11441)")
+private int controlPort = 11441;
+  }
+
+  private SamzaJobServerDriver(ServerConfiguration config) {
+this.config = config;
+  }
+
+  public static void main(String[] args) throws Exception {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+  fromConfig(configuration).run();
+} catch (CmdLineException e) {
+  LOG.error("Unable to parse command line arguments {}", 
Arrays.asList(args), e);
+  throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
+} catch (Exception e) {
+  LOG.error("Hit exception with SamzaJobServer. Exiting...", e);
+  throw e;
+}
+  }
+
+  public static SamzaJobServerDriver fromConfig(ServerConfiguration config) {
+return new SamzaJobServerDriver(config);
+  }
+
+  private InMemoryJobService createJobService() throws IOException {
+JobInvoker jobInvoker =
+new JobInvoker() {
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
retrievalToken)
+  throws IOException {
+SamzaPipelineOptions samzaPipelineOptions =
+
PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class);
+Map overrideConfig =
+samzaPipelineOptions.getConfigOverride() != null
+? samzaPipelineOptions.getConfigOverride()
+: new HashMap<>();
+overrideConfig.put(SamzaRunnerOverrideConfigs.IS_PORTABLE_MODE, 
String.valueOf(true));
+overrideConfig.put(
+SamzaRunnerOverrideConfigs.FN_CONTROL_PORT, 
String.valueOf(config.controlPort));
+samzaPipelineOptions.setConfigOverride(overrideConfig);
+return new 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186664=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186664
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 02:08
Start Date: 18/Jan/19 02:08
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248907238
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Driver program that starts a job server. */
+public class SamzaJobServerDriver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobServerDriver.class);
+
+  private final ServerConfiguration config;
+
+  /** Configuration for the jobServer. */
+  private static class ServerConfiguration {
+@Option(name = "--job-port", usage = "The job service port. (Default: 
11440)")
+private int jobPort = 11440;
+
+@Option(name = "--control-port", usage = "The FnControl port. (Default: 
11441)")
+private int controlPort = 11441;
+  }
+
+  private SamzaJobServerDriver(ServerConfiguration config) {
+this.config = config;
+  }
+
+  public static void main(String[] args) throws Exception {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+  fromConfig(configuration).run();
+} catch (CmdLineException e) {
+  LOG.error("Unable to parse command line arguments {}", 
Arrays.asList(args), e);
+  throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
+} catch (Exception e) {
+  LOG.error("Hit exception with SamzaJobServer. Exiting...", e);
+  throw e;
+}
+  }
+
+  public static SamzaJobServerDriver fromConfig(ServerConfiguration config) {
+return new SamzaJobServerDriver(config);
+  }
+
+  private InMemoryJobService createJobService() throws IOException {
 
 Review comment:
   sure. need to move the config to be a parameter. either way doesn't make too 
much difference
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186664)
Time Spent: 3h 40m  (was: 3.5h)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186662=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186662
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 02:04
Start Date: 18/Jan/19 02:04
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248906683
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Invocation of a Samza job via {@link SamzaRunner}. */
+public class SamzaJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobInvocation.class);
+
+  private final SamzaPipelineOptions options;
+  private final RunnerApi.Pipeline originalPipeline;
+  private volatile SamzaPipelineResult pipelineResult;
+
+  public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions 
options) {
+this.originalPipeline = pipeline;
+this.options = options;
+  }
+
+  private SamzaPipelineResult invokeSamzaJob() {
+// Fused pipeline proto.
+final RunnerApi.Pipeline fusedPipeline =
+GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
+LOG.info("Portable pipeline to run:");
+LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
+// the pipeline option coming from sdk will set the sdk specific runner 
which will break serialization
+// so we need to reset the runner here to a valid Java runner
+options.setRunner(SamzaRunner.class);
+try {
+  final SamzaRunner runner = new SamzaRunner(options);
+  return runner.runPortablePipeline(fusedPipeline);
+} catch (Exception e) {
+  throw new RuntimeException("Failed to invoke samza job", e);
+}
+  }
+
+  @Override
+  public void start() {
+LOG.info("Starting job invocation {}", getId());
+pipelineResult = invokeSamzaJob();
+  }
+
+  @Override
+  public String getId() {
+return options.getJobName();
+  }
+
+  @Override
+  public void cancel() {
+try {
+  if (pipelineResult != null) {
+pipelineResult.cancel();
+  }
+} catch (Exception e) {
+  throw new RuntimeException("Failed to cancel job.", e);
+}
+  }
+
+  @Override
+  public JobApi.JobState.Enum getState() {
+if (pipelineResult == null) {
+  return STARTING;
+}
+switch (pipelineResult.getState()) {
 
 Review comment:
   because one is the Java PipelineResult.State, the other one is the protobuf 
version of JobApi.JobState. That's very unfortunate. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186661=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186661
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 02:04
Start Date: 18/Jan/19 02:04
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248906683
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Invocation of a Samza job via {@link SamzaRunner}. */
+public class SamzaJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobInvocation.class);
+
+  private final SamzaPipelineOptions options;
+  private final RunnerApi.Pipeline originalPipeline;
+  private volatile SamzaPipelineResult pipelineResult;
+
+  public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions 
options) {
+this.originalPipeline = pipeline;
+this.options = options;
+  }
+
+  private SamzaPipelineResult invokeSamzaJob() {
+// Fused pipeline proto.
+final RunnerApi.Pipeline fusedPipeline =
+GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
+LOG.info("Portable pipeline to run:");
+LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
+// the pipeline option coming from sdk will set the sdk specific runner 
which will break serialization
+// so we need to reset the runner here to a valid Java runner
+options.setRunner(SamzaRunner.class);
+try {
+  final SamzaRunner runner = new SamzaRunner(options);
+  return runner.runPortablePipeline(fusedPipeline);
+} catch (Exception e) {
+  throw new RuntimeException("Failed to invoke samza job", e);
+}
+  }
+
+  @Override
+  public void start() {
+LOG.info("Starting job invocation {}", getId());
+pipelineResult = invokeSamzaJob();
+  }
+
+  @Override
+  public String getId() {
+return options.getJobName();
+  }
+
+  @Override
+  public void cancel() {
+try {
+  if (pipelineResult != null) {
+pipelineResult.cancel();
+  }
+} catch (Exception e) {
+  throw new RuntimeException("Failed to cancel job.", e);
+}
+  }
+
+  @Override
+  public JobApi.JobState.Enum getState() {
+if (pipelineResult == null) {
+  return STARTING;
+}
+switch (pipelineResult.getState()) {
 
 Review comment:
   because one if the Java PipelineResult.State, the other one is the protobuf 
version of JobApi.JobState. That's very unfortunate. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-17 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=186659=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-186659
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 18/Jan/19 01:57
Start Date: 18/Jan/19 01:57
Worklog Time Spent: 10m 
  Work Description: lhaiesp commented on pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r248905652
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Invocation of a Samza job via {@link SamzaRunner}. */
+public class SamzaJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobInvocation.class);
+
+  private final SamzaPipelineOptions options;
+  private final RunnerApi.Pipeline originalPipeline;
+  private volatile SamzaPipelineResult pipelineResult;
+
+  public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions 
options) {
+this.originalPipeline = pipeline;
+this.options = options;
+  }
+
+  private SamzaPipelineResult invokeSamzaJob() {
+// Fused pipeline proto.
+final RunnerApi.Pipeline fusedPipeline =
+GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
+LOG.info("Portable pipeline to run:");
+LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
+// the pipeline option coming from sdk will set the sdk specific runner 
which will break serialization
+// so we need to reset the runner here to a valid Java runner
+options.setRunner(SamzaRunner.class);
+try {
+  final SamzaRunner runner = new SamzaRunner(options);
+  return runner.runPortablePipeline(fusedPipeline);
+} catch (Exception e) {
+  throw new RuntimeException("Failed to invoke samza job", e);
+}
+  }
+
+  @Override
+  public void start() {
+LOG.info("Starting job invocation {}", getId());
+pipelineResult = invokeSamzaJob();
+  }
+
+  @Override
+  public String getId() {
 
 Review comment:
   Looks like it's only used for logging. But you're right, the semantic does 
require the id to be unique across different invocation. Flink uses job name + 
random UUID. I'm considering using the same pattern here.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 186659)
Time Spent: 3h  (was: 2h 50m)

> initial support for portable api in samza runner
> 
>
> Key: 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180454=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180454
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 23:22
Start Date: 02/Jan/19 23:22
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-451016084
 
 
   Run Java PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180454)
Time Spent: 2h 50m  (was: 2h 40m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180453=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180453
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 23:21
Start Date: 02/Jan/19 23:21
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244877673
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
 ##
 @@ -66,21 +68,48 @@ public void translate(
 }
 
 if (inputStreams.size() == 1) {
-  ctx.registerMessageStream(output, inputStreams.get(0));
+  ctx.registerMessageStream(output, 
Iterables.getOnlyElement(inputStreams));
   return;
 }
 
+ctx.registerMessageStream(output, mergeInputStreams(inputStreams));
+  }
+
+  @Override
+  public void translatePortable(
+  PipelineNode.PTransformNode transform,
+  QueryablePipeline pipeline,
+  PortableTranslationContext ctx) {
+final List>> inputStreams = 
ctx.getAllInputMessageStreams(transform);
+final String outputId = ctx.getOutputId(transform);
+
+if (inputStreams.isEmpty()) {
 
 Review comment:
   checkState(!inputStreams.isEmpty(), ".")
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180453)
Time Spent: 2h 40m  (was: 2.5h)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180452=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180452
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 23:21
Start Date: 02/Jan/19 23:21
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244882732
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/PortableTranslationContext.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza.translation;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.runners.core.construction.graph.PipelineNode;
+import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
+import org.apache.beam.runners.fnexecution.wire.WireCoders;
+import org.apache.beam.runners.samza.SamzaPipelineOptions;
+import org.apache.beam.runners.samza.runtime.OpMessage;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.protobuf.v3.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+
+/**
+ * Helper that keeps the mapping from BEAM PCollection id to Samza {@link 
MessageStream}. It also
+ * provides other context data such as input and output of a {@link
+ * org.apache.beam.model.pipeline.v1.RunnerApi.PTransform}.
+ */
+public class PortableTranslationContext {
+  private final Map> messsageStreams = new 
HashMap<>();
+  private final StreamGraph streamGraph;
+  private final SamzaPipelineOptions options;
+  private int topologicalId;
+  private final Set registeredInputStreams = new HashSet<>();
+
+  public PortableTranslationContext(StreamGraph streamGraph, 
SamzaPipelineOptions options) {
+this.streamGraph = streamGraph;
+this.options = options;
+  }
+
+  public SamzaPipelineOptions getSamzaPipelineOptions() {
+return this.options;
+  }
+
+  public void setCurrentTopologicalId(int id) {
+this.topologicalId = id;
+  }
+
+  public int getCurrentTopologicalId() {
+return this.topologicalId;
+  }
+
+  public  List>> getAllInputMessageStreams(
+  PipelineNode.PTransformNode transform) {
+final Collection inputStreamIds = 
transform.getTransform().getInputsMap().values();
+return 
inputStreamIds.stream().map(this::getMessageStreamById).collect(Collectors.toList());
+  }
+
+  public  MessageStream> getOneInputMessageStream(
+  PipelineNode.PTransformNode transform) {
+String id = 
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
+return getMessageStreamById(id);
+  }
+
+  @SuppressWarnings("unchecked")
+  public  MessageStream> getMessageStreamById(String id) {
+return (MessageStream>) messsageStreams.get(id);
+  }
+
+  public String getInputId(PipelineNode.PTransformNode transform) {
+return 
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values());
+  }
+
+  public String getOutputId(PipelineNode.PTransformNode transform) {
+return 
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values());
+  }
+
+  public  void registerMessageStream(String id, MessageStream> 
stream) {
+if (messsageStreams.containsKey(id)) {
+  throw new IllegalArgumentException("Stream already 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180451=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180451
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 23:21
Start Date: 02/Jan/19 23:21
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244877567
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/translation/FlattenPCollectionsTranslator.java
 ##
 @@ -66,21 +68,48 @@ public void translate(
 }
 
 if (inputStreams.size() == 1) {
-  ctx.registerMessageStream(output, inputStreams.get(0));
+  ctx.registerMessageStream(output, 
Iterables.getOnlyElement(inputStreams));
   return;
 }
 
+ctx.registerMessageStream(output, mergeInputStreams(inputStreams));
+  }
+
+  @Override
+  public void translatePortable(
+  PipelineNode.PTransformNode transform,
+  QueryablePipeline pipeline,
+  PortableTranslationContext ctx) {
+final List>> inputStreams = 
ctx.getAllInputMessageStreams(transform);
+final String outputId = ctx.getOutputId(transform);
+
+if (inputStreams.isEmpty()) {
+  // For portable api there should be at least the impulse as a dummy input
+  // We will know once validateRunner tests are available for portable 
runners
+  throw new IllegalArgumentException(
+  String.format("no input streams defined for Flatten: %s", 
transform.getId()));
+}
+
+if (inputStreams.size() == 1) {
 
 Review comment:
   This optimization should be folded inside mergeInputStreams()
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180451)
Time Spent: 2h 20m  (was: 2h 10m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180330=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180330
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244792611
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Invocation of a Samza job via {@link SamzaRunner}. */
+public class SamzaJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobInvocation.class);
+
+  private final SamzaPipelineOptions options;
+  private final RunnerApi.Pipeline originalPipeline;
+  private volatile SamzaPipelineResult pipelineResult;
+
+  public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions 
options) {
+this.originalPipeline = pipeline;
+this.options = options;
+  }
+
+  private SamzaPipelineResult invokeSamzaJob() {
+// Fused pipeline proto.
+final RunnerApi.Pipeline fusedPipeline =
+GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
+LOG.info("Portable pipeline to run:");
+LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
+// the pipeline option coming from sdk will set the sdk specific runner 
which will break serialization
+// so we need to reset the runner here to a valid Java runner
+options.setRunner(SamzaRunner.class);
+try {
+  final SamzaRunner runner = new SamzaRunner(options);
+  return runner.runPortablePipeline(fusedPipeline);
+} catch (Exception e) {
+  throw new RuntimeException("Failed to invoke samza job", e);
+}
+  }
+
+  @Override
+  public void start() {
+LOG.info("Starting job invocation {}", getId());
+pipelineResult = invokeSamzaJob();
+  }
+
+  @Override
+  public String getId() {
 
 Review comment:
   I think we might need to concatenate jobId with jobName here, since Samza 
jobs can have multiple instances. Also good to check where getId() is being 
used. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180330)
Time Spent: 1.5h  (was: 1h 20m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180331=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180331
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244793072
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
+import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
+import static 
org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
+
+import java.util.function.Consumer;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Invocation of a Samza job via {@link SamzaRunner}. */
+public class SamzaJobInvocation implements JobInvocation {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobInvocation.class);
+
+  private final SamzaPipelineOptions options;
+  private final RunnerApi.Pipeline originalPipeline;
+  private volatile SamzaPipelineResult pipelineResult;
+
+  public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions 
options) {
+this.originalPipeline = pipeline;
+this.options = options;
+  }
+
+  private SamzaPipelineResult invokeSamzaJob() {
+// Fused pipeline proto.
+final RunnerApi.Pipeline fusedPipeline =
+GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
+LOG.info("Portable pipeline to run:");
+LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
+// the pipeline option coming from sdk will set the sdk specific runner 
which will break serialization
+// so we need to reset the runner here to a valid Java runner
+options.setRunner(SamzaRunner.class);
+try {
+  final SamzaRunner runner = new SamzaRunner(options);
+  return runner.runPortablePipeline(fusedPipeline);
+} catch (Exception e) {
+  throw new RuntimeException("Failed to invoke samza job", e);
+}
+  }
+
+  @Override
+  public void start() {
+LOG.info("Starting job invocation {}", getId());
+pipelineResult = invokeSamzaJob();
+  }
+
+  @Override
+  public String getId() {
+return options.getJobName();
+  }
+
+  @Override
+  public void cancel() {
+try {
+  if (pipelineResult != null) {
+pipelineResult.cancel();
+  }
+} catch (Exception e) {
+  throw new RuntimeException("Failed to cancel job.", e);
+}
+  }
+
+  @Override
+  public JobApi.JobState.Enum getState() {
+if (pipelineResult == null) {
+  return STARTING;
+}
+switch (pipelineResult.getState()) {
 
 Review comment:
   Seems we can directly return pipelineResult here. Any idea why doing this 
switch?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180337=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180337
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244794298
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Driver program that starts a job server. */
+public class SamzaJobServerDriver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobServerDriver.class);
+
+  private final ServerConfiguration config;
+
+  /** Configuration for the jobServer. */
+  private static class ServerConfiguration {
+@Option(name = "--job-port", usage = "The job service port. (Default: 
11440)")
+private int jobPort = 11440;
+
+@Option(name = "--control-port", usage = "The FnControl port. (Default: 
11441)")
+private int controlPort = 11441;
+  }
+
+  private SamzaJobServerDriver(ServerConfiguration config) {
+this.config = config;
+  }
+
+  public static void main(String[] args) throws Exception {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+  fromConfig(configuration).run();
+} catch (CmdLineException e) {
+  LOG.error("Unable to parse command line arguments {}", 
Arrays.asList(args), e);
+  throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
+} catch (Exception e) {
+  LOG.error("Hit exception with SamzaJobServer. Exiting...", e);
+  throw e;
+}
+  }
+
+  public static SamzaJobServerDriver fromConfig(ServerConfiguration config) {
+return new SamzaJobServerDriver(config);
+  }
+
+  private InMemoryJobService createJobService() throws IOException {
 
 Review comment:
   static?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180337)
Time Spent: 2h 10m  (was: 1h 50m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Support portable api in 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180332=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180332
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244797460
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
 this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer fnServer) {
+try (AutoCloseable closer = fnServer) {
+  // do nothing
+} catch (Exception e) {
+  LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+}
+  }
+
+  private void setUpContextManager(
+  StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+streamGraph.withContextManager(
+new ContextManager() {
+  @Override
+  public void init(Config config, TaskContext context) {
+if (executionContext.getMetricsContainer() == null) {
+  final MetricsRegistryMap metricsRegistry =
+  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+  executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+}
+
+if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+  if (jobBundleFactory == null) {
+try {
+  final long waitTimeoutMs =
+  
SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+  final InstructionRequestHandler instructionHandler =
+  controlClientPool
+  .getSource()
+  .take(SAMZA_WORKER_ID, 
Duration.ofMillis(waitTimeoutMs));
+  final EnvironmentFactory environmentFactory =
+  environment -> RemoteEnvironment.forHandler(environment, 
instructionHandler);
+  // TODO: use 
JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
+  jobBundleFactory =
+  SingleEnvironmentInstanceJobBundleFactory.create(
+  environmentFactory, fnDataServer, fnStateServer);
+} catch (Exception e) {
+  throw new RuntimeException(
+  "Running samza in Beam portable mode but failed to 
create job bundle factory",
+  e);
+}
+executionContext.setJobBundleFactory(jobBundleFactory);
+  }
+}
+
+context.setUserContext(executionContext);
+  }
+
+  @Override
+  public void close() {
+closeFnServer(fnControlServer);
+fnControlServer = null;
+closeFnServer(fnDataServer);
+fnDataServer = null;
+closeFnServer(fnStateServer);
+fnStateServer = null;
+  }
+});
+  }
+
+  private void setUpFnApiServer() {
+controlClientPool = MapControlClientPool.create();
 
 Review comment:
   DO we need to close this pool as well as the executorService during shutdown?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180332)
Time Spent: 1h 50m  (was: 1h 40m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180334=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180334
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244803102
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
 this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer fnServer) {
+try (AutoCloseable closer = fnServer) {
+  // do nothing
+} catch (Exception e) {
+  LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+}
+  }
+
+  private void setUpContextManager(
+  StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+streamGraph.withContextManager(
+new ContextManager() {
+  @Override
+  public void init(Config config, TaskContext context) {
+if (executionContext.getMetricsContainer() == null) {
+  final MetricsRegistryMap metricsRegistry =
+  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+  executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+}
+
+if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+  if (jobBundleFactory == null) {
+try {
+  final long waitTimeoutMs =
+  
SamzaRunnerOverrideConfigs.getControlClientWaitTimeoutMs(options);
+  final InstructionRequestHandler instructionHandler =
+  controlClientPool
+  .getSource()
+  .take(SAMZA_WORKER_ID, 
Duration.ofMillis(waitTimeoutMs));
+  final EnvironmentFactory environmentFactory =
+  environment -> RemoteEnvironment.forHandler(environment, 
instructionHandler);
+  // TODO: use 
JobBundleFactoryBase.WrappedSdkHarnessClient.wrapping
+  jobBundleFactory =
+  SingleEnvironmentInstanceJobBundleFactory.create(
+  environmentFactory, fnDataServer, fnStateServer);
+} catch (Exception e) {
+  throw new RuntimeException(
+  "Running samza in Beam portable mode but failed to 
create job bundle factory",
+  e);
+}
+executionContext.setJobBundleFactory(jobBundleFactory);
+  }
+}
+
+context.setUserContext(executionContext);
+  }
+
+  @Override
+  public void close() {
+closeFnServer(fnControlServer);
+fnControlServer = null;
+closeFnServer(fnDataServer);
+fnDataServer = null;
+closeFnServer(fnStateServer);
+fnStateServer = null;
+  }
+});
+  }
+
+  private void setUpFnApiServer() {
+controlClientPool = MapControlClientPool.create();
+ExecutorService dataExecutor = Executors.newCachedThreadPool();
+try {
+  fnControlServer =
+  GrpcFnServer.allocatePortAndCreateFor(
+  FnApiControlClientPoolService.offeringClientsToPool(
+  controlClientPool.getSink(), () -> SAMZA_WORKER_ID),
+  ServerFactory.createWithPortSupplier(
+  () -> SamzaRunnerOverrideConfigs.getFnControlPort(options)));
+
+  fnDataServer =
+  GrpcFnServer.allocatePortAndCreateFor(
+  GrpcDataService.create(dataExecutor, 
OutboundObserverFactory.serverDirect()),
+  ServerFactory.createDefault());
+
+  fnStateServer =
+  GrpcFnServer.allocatePortAndCreateFor(
+  GrpcStateService.create(), ServerFactory.createDefault());
+} catch (Exception e) {
+  LOG.error("Failed to set up fn api servers", e);
+  throw new RuntimeException(e);
+}
+  }
+
+  SamzaPipelineResult runPortablePipeline(RunnerApi.Pipeline pipeline) {
+final SamzaExecutionContext executionContext = new SamzaExecutionContext();
+setUpFnApiServer();
 
 Review comment:
   setupFnApiServer() needs to be folded inside setupContextManager() since it 
needs to be set up only when the stream graph is created during runtime. 
Currently it was done during submission time.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180336=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180336
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244795786
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Driver program that starts a job server. */
+public class SamzaJobServerDriver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobServerDriver.class);
+
+  private final ServerConfiguration config;
+
+  /** Configuration for the jobServer. */
+  private static class ServerConfiguration {
+@Option(name = "--job-port", usage = "The job service port. (Default: 
11440)")
+private int jobPort = 11440;
+
+@Option(name = "--control-port", usage = "The FnControl port. (Default: 
11441)")
+private int controlPort = 11441;
+  }
+
+  private SamzaJobServerDriver(ServerConfiguration config) {
+this.config = config;
+  }
+
+  public static void main(String[] args) throws Exception {
+ServerConfiguration configuration = new ServerConfiguration();
+CmdLineParser parser = new CmdLineParser(configuration);
+try {
+  parser.parseArgument(args);
+  fromConfig(configuration).run();
+} catch (CmdLineException e) {
+  LOG.error("Unable to parse command line arguments {}", 
Arrays.asList(args), e);
+  throw new IllegalArgumentException("Unable to parse command line 
arguments.", e);
+} catch (Exception e) {
+  LOG.error("Hit exception with SamzaJobServer. Exiting...", e);
+  throw e;
+}
+  }
+
+  public static SamzaJobServerDriver fromConfig(ServerConfiguration config) {
+return new SamzaJobServerDriver(config);
+  }
+
+  private InMemoryJobService createJobService() throws IOException {
+JobInvoker jobInvoker =
+new JobInvoker() {
+  @Override
+  public JobInvocation invoke(
+  RunnerApi.Pipeline pipeline, Struct options, @Nullable String 
retrievalToken)
+  throws IOException {
+SamzaPipelineOptions samzaPipelineOptions =
+
PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class);
+Map overrideConfig =
+samzaPipelineOptions.getConfigOverride() != null
+? samzaPipelineOptions.getConfigOverride()
+: new HashMap<>();
+overrideConfig.put(SamzaRunnerOverrideConfigs.IS_PORTABLE_MODE, 
String.valueOf(true));
+overrideConfig.put(
+SamzaRunnerOverrideConfigs.FN_CONTROL_PORT, 
String.valueOf(config.controlPort));
+samzaPipelineOptions.setConfigOverride(overrideConfig);
+return new 

[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180333=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180333
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244791772
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
 this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer fnServer) {
+try (AutoCloseable closer = fnServer) {
+  // do nothing
+} catch (Exception e) {
+  LOG.error("Failed to close fn api servers. Ignore since this is shutdown 
process...", e);
+}
+  }
+
+  private void setUpContextManager(
+  StreamGraph streamGraph, SamzaExecutionContext executionContext) {
+streamGraph.withContextManager(
+new ContextManager() {
+  @Override
+  public void init(Config config, TaskContext context) {
+if (executionContext.getMetricsContainer() == null) {
+  final MetricsRegistryMap metricsRegistry =
+  (MetricsRegistryMap) 
context.getSamzaContainerContext().metricsRegistry;
+  executionContext.setMetricsContainer(new 
SamzaMetricsContainer(metricsRegistry));
+}
+
+if (SamzaRunnerOverrideConfigs.isPortableMode(options)) {
+  if (jobBundleFactory == null) {
 
 Review comment:
   Here we set jobBundleFactory across all Samza tasks, so they will share a 
single connection to sdk worker process. Shall we also consider the option to 
have jobBundleFactory for each task?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180333)
Time Spent: 1h 50m  (was: 1h 40m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180335=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180335
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244793361
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
+import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.Struct;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Driver program that starts a job server. */
+public class SamzaJobServerDriver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SamzaJobServerDriver.class);
+
+  private final ServerConfiguration config;
+
+  /** Configuration for the jobServer. */
+  private static class ServerConfiguration {
+@Option(name = "--job-port", usage = "The job service port. (Default: 
11440)")
+private int jobPort = 11440;
+
+@Option(name = "--control-port", usage = "The FnControl port. (Default: 
11441)")
+private int controlPort = 11441;
+  }
+
+  private SamzaJobServerDriver(ServerConfiguration config) {
+this.config = config;
+  }
+
+  public static void main(String[] args) throws Exception {
+ServerConfiguration configuration = new ServerConfiguration();
 
 Review comment:
   I think this deserves a "final" here, and the line below too.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180335)
Time Spent: 2h  (was: 1h 50m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2019-01-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=180329=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-180329
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 02/Jan/19 17:51
Start Date: 02/Jan/19 17:51
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on pull request #7321: 
[BEAM-6271] SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#discussion_r244796257
 
 

 ##
 File path: 
runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
 ##
 @@ -58,6 +86,107 @@ public SamzaRunner(SamzaPipelineOptions options) {
 this.options = options;
   }
 
+  private void closeFnServer(GrpcFnServer fnServer) {
 
 Review comment:
   static?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 180329)
Time Spent: 1h 20m  (was: 1h 10m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2018-12-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177530=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177530
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 20/Dec/18 14:44
Start Date: 20/Dec/18 14:44
Worklog Time Spent: 10m 
  Work Description: mxm commented on issue #7321: [BEAM-6271] SamzaRunner: 
initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-449021622
 
 
   Thanks for the PR. Great to see another Runner becoming portable. I'll 
definitely check this out.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 177530)
Time Spent: 1h  (was: 50m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2018-12-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177314=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177314
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 20/Dec/18 03:27
Start Date: 20/Dec/18 03:27
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-448847386
 
 
   Run Java PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 177314)
Time Spent: 40m  (was: 0.5h)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2018-12-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177315
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 20/Dec/18 03:27
Start Date: 20/Dec/18 03:27
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-448847448
 
 
   Run Portable_Python PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 177315)
Time Spent: 50m  (was: 40m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2018-12-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177313=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177313
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 20/Dec/18 03:25
Start Date: 20/Dec/18 03:25
Worklog Time Spent: 10m 
  Work Description: xinyuiscool commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-448846782
 
 
   Run Java PreCommit
   Run Portable_Python PreCommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 177313)
Time Spent: 0.5h  (was: 20m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2018-12-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177130=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177130
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 19/Dec/18 18:46
Start Date: 19/Dec/18 18:46
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321#issuecomment-448702966
 
 
   Nice! I tagged a couple others who are good choices to take a look at this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 177130)
Time Spent: 20m  (was: 10m)

> initial support for portable api in samza runner
> 
>
> Key: BEAM-6271
> URL: https://issues.apache.org/jira/browse/BEAM-6271
> Project: Beam
>  Issue Type: Task
>  Components: runner-samza
>Reporter: Hai Lu
>Assignee: Hai Lu
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Support portable api in Samza runner.
> This ticket tracks the initial effort to support portable api in Samza 
> runner, including job server inside samza runner, config translation for 
> portable pipeline, transform translation for portable pipeline, refactor of 
> existing codes to merge logic of portable api and java api as much as 
> possible, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-6271) initial support for portable api in samza runner

2018-12-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-6271?focusedWorklogId=177085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-177085
 ]

ASF GitHub Bot logged work on BEAM-6271:


Author: ASF GitHub Bot
Created on: 19/Dec/18 17:15
Start Date: 19/Dec/18 17:15
Worklog Time Spent: 10m 
  Work Description: lhaiesp opened a new pull request #7321: [BEAM-6271] 
SamzaRunner: initial support for portable api in samza runner
URL: https://github.com/apache/beam/pull/7321
 
 
   initial support for portable api in samza runner, including job server 
inside samza runner, config translation for portable pipeline, transform 
translation for portable pipeline, refactor of existing codes to merge logic of 
portable api and java api as much as possible, etc.
   
   The change doesn't yet include effort to support
   - batching the events transferred between java runner and sdk workers 
(bundling)
   - multiple sdk workers connecting to one java runner
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   It will help us expedite review of your Pull Request if you tag someone 
(e.g. `@username`) to look at it.
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)
 [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
  [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/)
 | --- | --- | ---
   
   
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact