[jira] [Work logged] (BEAM-3906) Get Python Wheel Validation Automated

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3906?focusedWorklogId=92431&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92431
 ]

ASF GitHub Bot logged work on BEAM-3906:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:58
Start Date: 19/Apr/18 06:58
Worklog Time Spent: 10m 
  Work Description: yifanzou commented on issue #4943: [BEAM-3906] Automate 
Validation Aganist Python Wheel
URL: https://github.com/apache/beam/pull/4943#issuecomment-382610788
 
 
   Run Seed Job


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92431)
Time Spent: 7h 40m  (was: 7.5h)

> Get Python Wheel Validation Automated
> -
>
> Key: BEAM-3906
> URL: https://issues.apache.org/jira/browse/BEAM-3906
> Project: Beam
>  Issue Type: Sub-task
>  Components: examples-python, testing
>Reporter: yifan zou
>Assignee: yifan zou
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92427&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92427
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:36
Start Date: 19/Apr/18 06:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5158: [BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182645267
 
 

 ##
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##
 @@ -188,39 +190,51 @@ def _get_final_name(self, shard_num, num_shards):
 self.file_name_suffix.get()
 ])
 
+  @check_accessible(['file_path_prefix', 'file_name_suffix'])
+  def _get_final_name_glob(self, num_shards):
+return ''.join([
+  self.file_path_prefix.get(),
+  self.shard_name_glob_format % dict(num_shards=num_shards),
+  self.file_name_suffix.get()
+])
+
   def pre_finalize(self, init_result, writer_results):
-writer_results = sorted(writer_results)
-num_shards = len(writer_results)
-existing_files = []
-for shard_num in range(len(writer_results)):
-  final_name = self._get_final_name(shard_num, num_shards)
-  if FileSystems.exists(final_name):
-existing_files.append(final_name)
-if existing_files:
-  logging.info('Deleting existing files in target path: %d',
-   len(existing_files))
-  FileSystems.delete(existing_files)
+num_shards = len(list(writer_results))
+dst_glob = self._get_final_name_glob(num_shards)
+dst_glob_files = [file_metadata.path
+  for mr in FileSystems.match([dst_glob])
+  for file_metadata in mr.metadata_list]
 
-  @check_accessible(['file_path_prefix'])
-  def finalize_write(self, init_result, writer_results,
- unused_pre_finalize_results):
-writer_results = sorted(writer_results)
-num_shards = len(writer_results)
+if dst_glob_files:
+  logging.info('Deleting existing files in target path: %d',
+   len(dst_glob_files))
+  FileSystems.delete(dst_glob_files)
+
+  def _check_state_for_finalize_write(self, writer_results, num_shards):
 
 Review comment:
   Please add comments describing this and other util methods added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92427)
Time Spent: 1h 10m  (was: 1h)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92425&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92425
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:36
Start Date: 19/Apr/18 06:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5158: [BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182645509
 
 

 ##
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##
 @@ -309,11 +341,18 @@ def _template_to_format(shard_name_template):
shard_name_template)
 shard_name_format = shard_name_template.replace(
 m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
-m = re.search('N+', shard_name_format)
-if m:
-  shard_name_format = shard_name_format.replace(
-  m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
-return shard_name_format
+return FileBasedSink._template_replace_num_shards(shard_name_format)
+
+  @staticmethod
+  def _template_to_glob_format(shard_name_template):
+if not shard_name_template:
+  return ''
+m = re.search('S+', shard_name_template)
+if m is None:
+  raise ValueError("Shard number pattern S+ not found in template '%s'" %
 
 Review comment:
   Not sure if this error message is useful for an end-user since it might not 
be immediately clear what S+ is. Can we elaborate more ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92425)
Time Spent: 1h  (was: 50m)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92426&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92426
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:36
Start Date: 19/Apr/18 06:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5158: [BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182643938
 
 

 ##
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##
 @@ -309,11 +341,18 @@ def _template_to_format(shard_name_template):
shard_name_template)
 shard_name_format = shard_name_template.replace(
 m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
-m = re.search('N+', shard_name_format)
-if m:
-  shard_name_format = shard_name_format.replace(
-  m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
-return shard_name_format
+return FileBasedSink._template_replace_num_shards(shard_name_format)
+
+  @staticmethod
+  def _template_to_glob_format(shard_name_template):
+if not shard_name_template:
+  return ''
+m = re.search('S+', shard_name_template)
 
 Review comment:
   This will restrict us to shard name patterns of the form S+. Can we log a 
warn and not perform the file deletion instead of failing ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92426)
Time Spent: 1h 10m  (was: 1h)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92428&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92428
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:36
Start Date: 19/Apr/18 06:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5158: [BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182644400
 
 

 ##
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##
 @@ -299,6 +323,14 @@ def _rename_batch(batch):
   # May have already been removed.
   pass
 
+  @staticmethod
+  def _template_replace_num_shards(shard_name_template):
+m = re.search('N+', shard_name_template)
+if m:
+  shard_name_template = shard_name_template.replace(
+  m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
 
 Review comment:
   Shouldn't num_shards be the second parameter ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92428)
Time Spent: 1h 20m  (was: 1h 10m)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92422&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92422
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:36
Start Date: 19/Apr/18 06:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5158: [BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182644216
 
 

 ##
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##
 @@ -299,6 +323,14 @@ def _rename_batch(batch):
   # May have already been removed.
   pass
 
+  @staticmethod
+  def _template_replace_num_shards(shard_name_template):
+m = re.search('N+', shard_name_template)
 
 Review comment:
   Please replace 'm' with a more descriptive variable name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92422)
Time Spent: 50m  (was: 40m)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92424&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92424
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:36
Start Date: 19/Apr/18 06:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5158: [BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182643773
 
 

 ##
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##
 @@ -309,11 +341,18 @@ def _template_to_format(shard_name_template):
shard_name_template)
 shard_name_format = shard_name_template.replace(
 m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
-m = re.search('N+', shard_name_format)
-if m:
-  shard_name_format = shard_name_format.replace(
-  m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
-return shard_name_format
+return FileBasedSink._template_replace_num_shards(shard_name_format)
+
+  @staticmethod
+  def _template_to_glob_format(shard_name_template):
+if not shard_name_template:
+  return ''
+m = re.search('S+', shard_name_template)
 
 Review comment:
   Please replace 'm' with a more descriptive variable name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92424)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92423&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92423
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 06:36
Start Date: 19/Apr/18 06:36
Worklog Time Spent: 10m 
  Work Description: chamikaramj commented on a change in pull request 
#5158: [BEAM-4062] Fix performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#discussion_r182644967
 
 

 ##
 File path: sdks/python/apache_beam/io/filebasedsink.py
 ##
 @@ -188,39 +190,51 @@ def _get_final_name(self, shard_num, num_shards):
 self.file_name_suffix.get()
 ])
 
+  @check_accessible(['file_path_prefix', 'file_name_suffix'])
+  def _get_final_name_glob(self, num_shards):
+return ''.join([
+  self.file_path_prefix.get(),
+  self.shard_name_glob_format % dict(num_shards=num_shards),
+  self.file_name_suffix.get()
+])
+
   def pre_finalize(self, init_result, writer_results):
-writer_results = sorted(writer_results)
-num_shards = len(writer_results)
-existing_files = []
-for shard_num in range(len(writer_results)):
-  final_name = self._get_final_name(shard_num, num_shards)
-  if FileSystems.exists(final_name):
-existing_files.append(final_name)
-if existing_files:
-  logging.info('Deleting existing files in target path: %d',
-   len(existing_files))
-  FileSystems.delete(existing_files)
+num_shards = len(list(writer_results))
+dst_glob = self._get_final_name_glob(num_shards)
+dst_glob_files = [file_metadata.path
+  for mr in FileSystems.match([dst_glob])
+  for file_metadata in mr.metadata_list]
 
-  @check_accessible(['file_path_prefix'])
-  def finalize_write(self, init_result, writer_results,
- unused_pre_finalize_results):
-writer_results = sorted(writer_results)
-num_shards = len(writer_results)
+if dst_glob_files:
+  logging.info('Deleting existing files in target path: %d',
 
 Review comment:
   Probably this should be a logging.warn


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92423)
Time Spent: 50m  (was: 40m)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PerformanceTests_Spark #1608

2018-04-18 Thread Apache Jenkins Server
See 


--
[...truncated 91.47 KB...]
'apache-beam-testing:bqjob_r5030e5aa2d91ed7b_0162dc8ef897_1': Invalid schema
update. Field timestamp has changed type from TIMESTAMP to FLOAT

STDERR: 
/usr/lib/google-cloud-sdk/platform/bq/third_party/oauth2client/contrib/gce.py:73:
 UserWarning: You have requested explicit scopes to be used with a GCE service 
account.
Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.

  warnings.warn(_SCOPES_WARNING)

2018-04-19 06:19:41,856 fbd11edc MainThread INFO Retrying exception running 
IssueRetryableCommand: Command returned a non-zero exit code.

2018-04-19 06:20:02,297 fbd11edc MainThread INFO Running: bq load 
--autodetect --source_format=NEWLINE_DELIMITED_JSON 
beam_performance.pkb_results 

2018-04-19 06:20:04,522 fbd11edc MainThread INFO Ran: {bq load --autodetect 
--source_format=NEWLINE_DELIMITED_JSON beam_performance.pkb_results 

  ReturnCode:1
STDOUT: Upload complete.
Waiting on bqjob_r35b90be273cb3f2a_0162dc8f511a_1 ... (0s) Current status: 
RUNNING 
 Waiting on bqjob_r35b90be273cb3f2a_0162dc8f511a_1 ... (0s) 
Current status: DONE   
BigQuery error in load operation: Error processing job
'apache-beam-testing:bqjob_r35b90be273cb3f2a_0162dc8f511a_1': Invalid schema
update. Field timestamp has changed type from TIMESTAMP to FLOAT

STDERR: 
/usr/lib/google-cloud-sdk/platform/bq/third_party/oauth2client/contrib/gce.py:73:
 UserWarning: You have requested explicit scopes to be used with a GCE service 
account.
Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.

  warnings.warn(_SCOPES_WARNING)

2018-04-19 06:20:04,522 fbd11edc MainThread INFO Retrying exception running 
IssueRetryableCommand: Command returned a non-zero exit code.

2018-04-19 06:20:31,739 fbd11edc MainThread INFO Running: bq load 
--autodetect --source_format=NEWLINE_DELIMITED_JSON 
beam_performance.pkb_results 

2018-04-19 06:20:33,971 fbd11edc MainThread INFO Ran: {bq load --autodetect 
--source_format=NEWLINE_DELIMITED_JSON beam_performance.pkb_results 

  ReturnCode:1
STDOUT: Upload complete.
Waiting on bqjob_r390c9fa6d4297fd5_0162dc8fc474_1 ... (0s) Current status: 
RUNNING 
 Waiting on bqjob_r390c9fa6d4297fd5_0162dc8fc474_1 ... (0s) 
Current status: DONE   
BigQuery error in load operation: Error processing job
'apache-beam-testing:bqjob_r390c9fa6d4297fd5_0162dc8fc474_1': Invalid schema
update. Field timestamp has changed type from TIMESTAMP to FLOAT

STDERR: 
/usr/lib/google-cloud-sdk/platform/bq/third_party/oauth2client/contrib/gce.py:73:
 UserWarning: You have requested explicit scopes to be used with a GCE service 
account.
Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.

  warnings.warn(_SCOPES_WARNING)

2018-04-19 06:20:33,972 fbd11edc MainThread INFO Retrying exception running 
IssueRetryableCommand: Command returned a non-zero exit code.

2018-04-19 06:21:02,549 fbd11edc MainThread INFO Running: bq load 
--autodetect --source_format=NEWLINE_DELIMITED_JSON 
beam_performance.pkb_results 

2018-04-19 06:21:05,303 fbd11edc MainThread INFO Ran: {bq load --autodetect 
--source_format=NEWLINE_DELIMITED_JSON beam_performance.pkb_results 

  ReturnCode:1
STDOUT: Upload complete.
Waiting on bqjob_r75e4bf112a7160f4_0162dc903d0e_1 ... (0s) Current status: 
RUNNING 
 Waiting on bqjob_r75e4bf112a7160f4_0162dc903d0e_1 ... (0s) 
Current status: DONE   
BigQuery error in load operation: Error processing job
'apache-beam-testing:bqjob_r75e4bf112a7160f4_0162dc903d0e_1': Invalid schema
update. Field timestamp has changed type from TIMESTAMP to FLOAT

STDERR: 
/usr/lib/google-cloud-sdk/platform/bq/third_

Build failed in Jenkins: beam_PostCommit_Java_ValidatesRunner_Spark_Gradle #141

2018-04-18 Thread Apache Jenkins Server
See 


--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam5 (beam) in workspace 
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle
FATAL: java.io.IOException: Unexpected termination of the channel
java.io.EOFException
at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2671)
at 
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3146)
at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:858)
at java.io.ObjectInputStream.(ObjectInputStream.java:354)
at 
hudson.remoting.ObjectInputStreamEx.(ObjectInputStreamEx.java:48)
at 
hudson.remoting.AbstractSynchronousByteArrayCommandTransport.read(AbstractSynchronousByteArrayCommandTransport.java:35)
at 
hudson.remoting.SynchronousCommandTransport$ReaderThread.run(SynchronousCommandTransport.java:63)
Caused: java.io.IOException: Unexpected termination of the channel
at 
hudson.remoting.SynchronousCommandTransport$ReaderThread.run(SynchronousCommandTransport.java:77)
Also:   hudson.remoting.Channel$CallSiteStackTrace: Remote call to beam5
at 
hudson.remoting.Channel.attachCallSiteStackTrace(Channel.java:1693)
at hudson.remoting.Request.call(Request.java:192)
at hudson.remoting.Channel.call(Channel.java:907)
at hudson.FilePath.act(FilePath.java:986)
at hudson.FilePath.act(FilePath.java:975)
at org.jenkinsci.plugins.gitclient.Git.getClient(Git.java:137)
at hudson.plugins.git.GitSCM.createClient(GitSCM.java:795)
at hudson.plugins.git.GitSCM.createClient(GitSCM.java:786)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1154)
at hudson.scm.SCM.checkout(SCM.java:495)
at 
hudson.model.AbstractProject.checkout(AbstractProject.java:1202)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at 
jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1724)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at 
hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
Caused: hudson.remoting.RequestAbortedException
at hudson.remoting.Request.abort(Request.java:329)
at hudson.remoting.Channel.terminate(Channel.java:992)
at 
hudson.remoting.SynchronousCommandTransport$ReaderThread.run(SynchronousCommandTransport.java:96)
Recording test results
ERROR: Step ‘Publish JUnit test result report’ aborted due to exception: 
java.io.EOFException
at 
java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2671)
at 
java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3146)
at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:858)
at java.io.ObjectInputStream.(ObjectInputStream.java:354)
at 
hudson.remoting.ObjectInputStreamEx.(ObjectInputStreamEx.java:48)
at 
hudson.remoting.AbstractSynchronousByteArrayCommandTransport.read(AbstractSynchronousByteArrayCommandTransport.java:35)
at 
hudson.remoting.SynchronousCommandTransport$ReaderThread.run(SynchronousCommandTransport.java:63)
Caused: java.io.IOException: Unexpected termination of the channel
at 
hudson.remoting.SynchronousCommandTransport$ReaderThread.run(SynchronousCommandTransport.java:77)
Caused: hudson.remoting.ChannelClosedException: Remote call on beam5 failed. 
The channel is closing down or has closed down
at hudson.remoting.Channel.call(Channel.java:901)
at hudson.FilePath.act(FilePath.java:986)
Caused: java.io.IOException: remote file operation failed: 
/home/jenkins/jenkins-slave/workspace/beam_PostCommit_Java_ValidatesRunner_Spark_Gradle
 at hudson.remoting.Channel@2d197894:beam5
at hudson.FilePath.act(FilePath.java:993)
at hudson.FilePath.act(FilePath.java:975)
at hudson.tasks.junit.JUnitParser.parseResult(JUnitParser.java:114)
at 
hudson.tasks.junit.JUnitResultArchiver.parse(JUnitResultArchiver.java:136)
at 
hudson.tasks.junit.JUnitResultArchiver.parseAndAttach(JUnitResultArchiver.java:166)
at 
hudson.tasks.junit.JUnitResultArchiver.perform(JUnitResultArchiver.java:153)
at 
hudson.tasks.BuildStepCompatibilityLayer.perform(BuildStepCompatibilityLayer.java:81)
at hudson.tasks.BuildStepMonitor$1.perform(BuildStepMonitor.java:20)
  

Jenkins build is back to normal : beam_PerformanceTests_HadoopInputFormat #159

2018-04-18 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : beam_PerformanceTests_AvroIOIT_HDFS #67

2018-04-18 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PerformanceTests_Python #1166

2018-04-18 Thread Apache Jenkins Server
See 


--
[...truncated 61.93 KB...]
[INFO] --- maven-assembly-plugin:3.1.0:single (export-go-pkg-sources) @ 
beam-sdks-go ---
[INFO] Reading assembly descriptor: descriptor.xml
[INFO] Building zip: 

[INFO] 
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) 
@ beam-sdks-go ---
[INFO] 
[INFO] --- mvn-golang-wrapper:2.1.7:get (go-get-imports) @ beam-sdks-go ---
[INFO] Prepared command line : bin/go get -u google.golang.org/grpc 
golang.org/x/oauth2/google google.golang.org/api/storage/v1 
github.com/spf13/cobra cloud.google.com/go/bigquery 
google.golang.org/api/googleapi google.golang.org/api/dataflow/v1b3
[INFO] 
[INFO] --- mvn-golang-wrapper:2.1.7:build (go-build) @ beam-sdks-go ---
[INFO] Prepared command line : bin/go build -buildmode=default -o 

 github.com/apache/beam/sdks/go/cmd/beamctl
[INFO] The Result file has been successfuly created : 

[INFO] 
[INFO] --- mvn-golang-wrapper:2.1.7:build (go-build-linux-amd64) @ beam-sdks-go 
---
[INFO] Prepared command line : bin/go build -buildmode=default -o 

 github.com/apache/beam/sdks/go/cmd/beamctl
[INFO] The Result file has been successfuly created : 

[INFO] 
[INFO] --- maven-checkstyle-plugin:3.0.0:check (default) @ beam-sdks-go ---
[INFO] 
[INFO] --- mvn-golang-wrapper:2.1.7:test (go-test) @ beam-sdks-go ---
[INFO] Prepared command line : bin/go test ./...
[ERROR] 
[ERROR] -Exec.Err-
[ERROR] 
/home/jenkins/.mvnGoLang/.go_path/src/cloud.google.com/go/pubsub/subscription.go:30:2:
 cannot find package "golang.org/x/sync/errgroup" in any of:
[ERROR] 
/home/jenkins/.mvnGoLang/go1.9.linux-amd64/src/golang.org/x/sync/errgroup (from 
$GOROOT)
[ERROR] 
/home/jenkins/.mvnGoLang/.go_path/src/golang.org/x/sync/errgroup (from $GOPATH)
[ERROR] 

[ERROR] 
/home/jenkins/.mvnGoLang/.go_path/src/cloud.google.com/go/pubsub/flow_controller.go:19:2:
 cannot find package "golang.org/x/sync/semaphore" in any of:
[ERROR] 
/home/jenkins/.mvnGoLang/go1.9.linux-amd64/src/golang.org/x/sync/semaphore 
(from $GOROOT)
[ERROR] 
/home/jenkins/.mvnGoLang/.go_path/src/golang.org/x/sync/semaphore (from $GOPATH)
[ERROR] 

[ERROR] 
[INFO] 
[INFO] Reactor Summary:
[INFO] 
[INFO] Apache Beam :: Parent .. SUCCESS [  9.847 s]
[INFO] Apache Beam :: SDKs :: Java :: Build Tools . SUCCESS [  7.156 s]
[INFO] Apache Beam :: Model ... SUCCESS [  0.183 s]
[INFO] Apache Beam :: Model :: Pipeline ... SUCCESS [ 23.898 s]
[INFO] Apache Beam :: Model :: Job Management . SUCCESS [ 10.437 s]
[INFO] Apache Beam :: Model :: Fn Execution ... SUCCESS [ 11.047 s]
[INFO] Apache Beam :: SDKs  SUCCESS [  0.455 s]
[INFO] Apache Beam :: SDKs :: Go .. FAILURE [ 55.575 s]
[INFO] Apache Beam :: SDKs :: Go :: Container . SKIPPED
[INFO] Apache Beam :: SDKs :: Java  SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Core  SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Fn Execution  SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Extensions .. SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core 
SKIPPED
[INFO] Apache Beam :: Runners . SKIPPED
[INFO] Apache Beam :: Runners :: Core Construction Java ... SKIPPED
[INFO] Apache Beam :: Runners :: Core Java  SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Harness . SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Container ... SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: IO .. SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services SKIPPED
[INFO] Apache Beam :: Runners :: Local Java Core .. SKIPPED
[INFO] Apache Beam :: Runners :: Direct Java .. SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: IO :: AMQP .

Build failed in Jenkins: beam_PerformanceTests_MongoDBIO_IT #68

2018-04-18 Thread Apache Jenkins Server
See 


--
Started by timer
[EnvInject] - Loading node environment variables.
Building remotely on beam7 (beam) in workspace 

Cloning the remote Git repository
Cloning repository https://github.com/apache/beam.git
 > git init 
 >  # 
 > timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/*
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/beam.git # timeout=10
Fetching upstream changes from https://github.com/apache/beam.git
 > git fetch --tags --progress https://github.com/apache/beam.git 
 > +refs/heads/*:refs/remotes/origin/* 
 > +refs/pull/${ghprbPullId}/*:refs/remotes/origin/pr/${ghprbPullId}/*
 > git rev-parse origin/master^{commit} # timeout=10
Checking out Revision 7300eb2bfdd412767e60173c0a0e0d76ba7d067d (origin/master)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f 7300eb2bfdd412767e60173c0a0e0d76ba7d067d
Commit message: "Merge pull request #4788: Add Mobile gaming automation for 
Java nightly snapshot on core runners"
 > git rev-list --no-walk 7300eb2bfdd412767e60173c0a0e0d76ba7d067d # timeout=10
Cleaning workspace
 > git rev-parse --verify HEAD # timeout=10
Resetting working tree
 > git reset --hard # timeout=10
 > git clean -fdx # timeout=10
[EnvInject] - Executing scripts and injecting environment variables after the 
SCM step.
[EnvInject] - Injecting as environment variables the properties content 
SPARK_LOCAL_IP=127.0.0.1

[EnvInject] - Variables injected successfully.
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins8737774227375687674.sh
+ gcloud container clusters get-credentials io-datastores --zone=us-central1-a 
--verbosity=debug
DEBUG: Running [gcloud.container.clusters.get-credentials] with arguments: 
[--verbosity: "debug", --zone: "us-central1-a", NAME: "io-datastores"]
Fetching cluster endpoint and auth data.
DEBUG: Saved kubeconfig to /home/jenkins/.kube/config
kubeconfig entry generated for io-datastores.
INFO: Display format "default".
DEBUG: SDK update checks are disabled.
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins1551898550526262504.sh
+ cp /home/jenkins/.kube/config 

[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins2512691532918460686.sh
+ kubectl 
--kubeconfig=
 create namespace mongodbioit-1524114275935
namespace "mongodbioit-1524114275935" created
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins1558833152364846796.sh
++ kubectl config current-context
+ kubectl 
--kubeconfig=
 config set-context gke_apache-beam-testing_us-central1-a_io-datastores 
--namespace=mongodbioit-1524114275935
Context "gke_apache-beam-testing_us-central1-a_io-datastores" modified.
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins6397172424709.sh
+ rm -rf PerfKitBenchmarker
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins5368355181622858260.sh
+ rm -rf .env
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins4123621701357233129.sh
+ virtualenv .env --system-site-packages
New python executable in .env/bin/python
Installing setuptools, pip...done.
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins6589739105186395688.sh
+ .env/bin/pip install --upgrade setuptools pip
Downloading/unpacking setuptools from 
https://files.pythonhosted.org/packages/20/d7/04a0b689d3035143e2ff288f4b9ee4bf6ed80585cc121c90bfd85a1a8c2e/setuptools-39.0.1-py2.py3-none-any.whl#sha256=8010754433e3211b9cdbbf784b50f30e80bf40fc6b05eb5f865fab83300599b8
Downloading/unpacking pip from 
https://files.pythonhosted.org/packages/62/a1/0d452b6901b0157a0134fd27ba89bf95a857fbda64ba52e1ca2cf61d8412/pip-10.0.0-py2.py3-none-any.whl#sha256=86a60a96d85e329962a9e6f6af612cbc11106293dbc83f119802b5bee9874cf3
Installing collected packages: setuptools, pip
  Found existing installation: setuptools 2.2
Uninstalling setuptools:
  Successfully uninstalled setuptools
  Found existing installation: pip 1.5.4
Uninstalling pip:
  Successfully uninstalled pip
Successfully installed setuptools pip
Cleaning up...
[beam_PerformanceTests_MongoDBIO_IT] $ /bin/bash -xe 
/tmp/jenkins82496876832

[jira] [Work logged] (BEAM-3906) Get Python Wheel Validation Automated

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3906?focusedWorklogId=92401&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92401
 ]

ASF GitHub Bot logged work on BEAM-3906:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:59
Start Date: 19/Apr/18 04:59
Worklog Time Spent: 10m 
  Work Description: yifanzou commented on issue #4943: [BEAM-3906] Automate 
Validation Aganist Python Wheel
URL: https://github.com/apache/beam/pull/4943#issuecomment-382610788
 
 
   Run Seed Job


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92401)
Time Spent: 7h 20m  (was: 7h 10m)

> Get Python Wheel Validation Automated
> -
>
> Key: BEAM-3906
> URL: https://issues.apache.org/jira/browse/BEAM-3906
> Project: Beam
>  Issue Type: Sub-task
>  Components: examples-python, testing
>Reporter: yifan zou
>Assignee: yifan zou
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3933) Beam 2.4 python wheel is not working on dataflow runners

2018-04-18 Thread yifan zou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443571#comment-16443571
 ] 

yifan zou edited comment on BEAM-3933 at 4/19/18 4:54 AM:
--

This bug is fixed by [https://github.com/apache/beam/pull/5110.]  Ticket 
resolved.

See https://issues.apache.org/jira/browse/BEAM-3950 for more info.


was (Author: yifanzou):
This but is fixed by [https://github.com/apache/beam/pull/5110.]  Ticket 
resolved.

See https://issues.apache.org/jira/browse/BEAM-3950 for more info.

> Beam 2.4 python wheel is not working on dataflow runners
> 
>
> Key: BEAM-3933
> URL: https://issues.apache.org/jira/browse/BEAM-3933
> Project: Beam
>  Issue Type: Bug
>  Components: examples-python, sdk-py-harness
>Affects Versions: 2.4.0
>Reporter: yifan zou
>Assignee: Ahmet Altay
>Priority: Major
> Fix For: 2.5.0
>
>
> Running beam examples on DataflowRunner against python wheel are failing due 
> to:
> Error syncing pod aaa0e0f09729d923b17726c465ccfcaf 
> ("dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)"),
>  skipping: failed to "StartContainer" for "python" with CrashLoopBackOff: 
> "Back-off 5m0s restarting failed container=python 
> pod=dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)
> Failed to install packages: failed to install SDK: exit status 1
>  
> Dataflow job: 
> [https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-03-25_20_33_34-14871401506794256600?project=apache-beam-testing]
> Jenkins job: 
> https://builds.apache.org/job/beam_PostRelease_Python_Candidate/79/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3933) Beam 2.4 python wheel is not working on dataflow runners

2018-04-18 Thread yifan zou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443571#comment-16443571
 ] 

yifan zou edited comment on BEAM-3933 at 4/19/18 4:53 AM:
--

This but is fixed by [https://github.com/apache/beam/pull/5110.]  Ticket 
resolved.

See https://issues.apache.org/jira/browse/BEAM-3950 for more info.


was (Author: yifanzou):
This but is fixed by [https://github.com/apache/beam/pull/5110.]  Ticket 
resolved.

> Beam 2.4 python wheel is not working on dataflow runners
> 
>
> Key: BEAM-3933
> URL: https://issues.apache.org/jira/browse/BEAM-3933
> Project: Beam
>  Issue Type: Bug
>  Components: examples-python, sdk-py-harness
>Affects Versions: 2.4.0
>Reporter: yifan zou
>Assignee: Ahmet Altay
>Priority: Major
> Fix For: 2.5.0
>
>
> Running beam examples on DataflowRunner against python wheel are failing due 
> to:
> Error syncing pod aaa0e0f09729d923b17726c465ccfcaf 
> ("dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)"),
>  skipping: failed to "StartContainer" for "python" with CrashLoopBackOff: 
> "Back-off 5m0s restarting failed container=python 
> pod=dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)
> Failed to install packages: failed to install SDK: exit status 1
>  
> Dataflow job: 
> [https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-03-25_20_33_34-14871401506794256600?project=apache-beam-testing]
> Jenkins job: 
> https://builds.apache.org/job/beam_PostRelease_Python_Candidate/79/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (BEAM-3933) Beam 2.4 python wheel is not working on dataflow runners

2018-04-18 Thread yifan zou (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yifan zou resolved BEAM-3933.
-
   Resolution: Fixed
Fix Version/s: 2.5.0

> Beam 2.4 python wheel is not working on dataflow runners
> 
>
> Key: BEAM-3933
> URL: https://issues.apache.org/jira/browse/BEAM-3933
> Project: Beam
>  Issue Type: Bug
>  Components: examples-python, sdk-py-harness
>Affects Versions: 2.4.0
>Reporter: yifan zou
>Assignee: Ahmet Altay
>Priority: Major
> Fix For: 2.5.0
>
>
> Running beam examples on DataflowRunner against python wheel are failing due 
> to:
> Error syncing pod aaa0e0f09729d923b17726c465ccfcaf 
> ("dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)"),
>  skipping: failed to "StartContainer" for "python" with CrashLoopBackOff: 
> "Back-off 5m0s restarting failed container=python 
> pod=dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)
> Failed to install packages: failed to install SDK: exit status 1
>  
> Dataflow job: 
> [https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-03-25_20_33_34-14871401506794256600?project=apache-beam-testing]
> Jenkins job: 
> https://builds.apache.org/job/beam_PostRelease_Python_Candidate/79/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (BEAM-3933) Beam 2.4 python wheel is not working on dataflow runners

2018-04-18 Thread yifan zou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443571#comment-16443571
 ] 

yifan zou edited comment on BEAM-3933 at 4/19/18 4:52 AM:
--

This but is fixed by [https://github.com/apache/beam/pull/5110.]  Ticket 
resolved.


was (Author: yifanzou):
This but is fixed by [https://github.com/apache/beam/pull/5110.] 

> Beam 2.4 python wheel is not working on dataflow runners
> 
>
> Key: BEAM-3933
> URL: https://issues.apache.org/jira/browse/BEAM-3933
> Project: Beam
>  Issue Type: Bug
>  Components: examples-python, sdk-py-harness
>Affects Versions: 2.4.0
>Reporter: yifan zou
>Assignee: Ahmet Altay
>Priority: Major
> Fix For: 2.5.0
>
>
> Running beam examples on DataflowRunner against python wheel are failing due 
> to:
> Error syncing pod aaa0e0f09729d923b17726c465ccfcaf 
> ("dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)"),
>  skipping: failed to "StartContainer" for "python" with CrashLoopBackOff: 
> "Back-off 5m0s restarting failed container=python 
> pod=dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)
> Failed to install packages: failed to install SDK: exit status 1
>  
> Dataflow job: 
> [https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-03-25_20_33_34-14871401506794256600?project=apache-beam-testing]
> Jenkins job: 
> https://builds.apache.org/job/beam_PostRelease_Python_Candidate/79/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3933) Beam 2.4 python wheel is not working on dataflow runners

2018-04-18 Thread yifan zou (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443571#comment-16443571
 ] 

yifan zou commented on BEAM-3933:
-

This but is fixed by [https://github.com/apache/beam/pull/5110.] 

> Beam 2.4 python wheel is not working on dataflow runners
> 
>
> Key: BEAM-3933
> URL: https://issues.apache.org/jira/browse/BEAM-3933
> Project: Beam
>  Issue Type: Bug
>  Components: examples-python, sdk-py-harness
>Affects Versions: 2.4.0
>Reporter: yifan zou
>Assignee: Ahmet Altay
>Priority: Major
>
> Running beam examples on DataflowRunner against python wheel are failing due 
> to:
> Error syncing pod aaa0e0f09729d923b17726c465ccfcaf 
> ("dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)"),
>  skipping: failed to "StartContainer" for "python" with CrashLoopBackOff: 
> "Back-off 5m0s restarting failed container=python 
> pod=dataflow-beamapp-jenkins-032603332-03252033-3d3b-harness-hgb3_default(aaa0e0f09729d923b17726c465ccfcaf)
> Failed to install packages: failed to install SDK: exit status 1
>  
> Dataflow job: 
> [https://pantheon.corp.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-03-25_20_33_34-14871401506794256600?project=apache-beam-testing]
> Jenkins job: 
> https://builds.apache.org/job/beam_PostRelease_Python_Candidate/79/console



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92397&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92397
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:18
Start Date: 19/Apr/18 04:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182630023
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -87,26 +87,32 @@ private ArtifactServiceStager(Channel channel, int 
bufferSize) {
 this.bufferSize = bufferSize;
   }
 
-  public void stage(Iterable files) throws IOException, 
InterruptedException {
-final Map> futures = new 
HashMap<>();
-for (File file : files) {
+  /**
+   * Stages the given artifact files to the staging service.
+   *
+   * @return The artifact staging token returned by the service
+   */
+  public String stage(Iterable files) throws IOException, 
InterruptedException {
+final Map> futures = new 
HashMap<>();
+for (StagedFile file : files) {
   futures.put(file, MoreFutures.supplyAsync(new StagingCallable(file), 
executorService));
 }
 CompletionStage stagingResult =
 MoreFutures.allAsList(futures.values())
 .thenApply(ignored -> new 
ExtractStagingResultsCallable(futures).call());
-stageManifest(stagingResult);
+return stageManifest(stagingResult);
   }
 
-  private void stageManifest(CompletionStage stagingFuture)
+  private String stageManifest(CompletionStage stagingFuture)
   throws InterruptedException {
 try {
   StagingResult stagingResult = MoreFutures.get(stagingFuture);
   if (stagingResult.isSuccess()) {
 Manifest manifest =
 
Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
-blockingStub.commitManifest(
-CommitManifestRequest.newBuilder().setManifest(manifest).build());
+return blockingStub
+
.commitManifest(CommitManifestRequest.newBuilder().setManifest(manifest).build())
+.getStagingToken();
 
 Review comment:
   
   
   > **tgroh** wrote:
   > Separate these two lines? Just a preference thing, but I think that the 
fluent style hints at something more immediate than the actual RPC we're making 
here.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92397)
Time Spent: 14.5h  (was: 14h 20m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 14.5h
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92396&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92396
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:18
Start Date: 19/Apr/18 04:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182630024
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.ByteString;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class JobServicePipelineResult implements PipelineResult {
+
+  private static final long POLL_INTERVAL_SEC = 10;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobServicePipelineResult.class);
+
+  private final ByteString jobId;
+  private final CloseableResource jobService;
+
+  JobServicePipelineResult(ByteString jobId, 
CloseableResource jobService) {
+this.jobId = jobId;
+this.jobService = jobService;
+  }
+
+  @Override
+  public State getState() {
+JobServiceBlockingStub stub = jobService.get();
+GetJobStateResponse response =
+
stub.getState(GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State cancel() {
+JobServiceBlockingStub stub = jobService.get();
+CancelJobResponse response =
+
stub.cancel(CancelJobRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+if (duration.compareTo(Duration.millis(1)) < 1) {
+  // Equivalent to infinite timeout.
+  return waitUntilFinish();
+} else {
+  CompletableFuture result = 
CompletableFuture.supplyAsync(this::waitUntilFinish);
+  try {
+return Uninterruptibles.getUninterruptibly(
 
 Review comment:
   
   
   > **tgroh** wrote:
   > Why is this uninterruptible?
   
   
   To ensure that we sleep fully for the amount of time requested. Given that 
we don't use interrupts anywhere in Beam code, we don't expect this to happen 
anyway. Alternatively we could just crash on interrupts (i.e., throw a 
RuntimeException wrapping the interruption).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92396)
Time Spent: 14h 20m  (was: 14h 10m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92393&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92393
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:18
Start Date: 19/Apr/18 04:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182630025
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
 
 Review comment:
   
   
   > **tgroh** wrote:
   > why not just `create`?
   
   
   Mostly to draw attention to the fact that this is an internal call and to 
distinguish it from `fromOptions`. If you have a strong preference, I'll change 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92392&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92392
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:18
Start Date: 19/Apr/18 04:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182630021
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -222,13 +230,26 @@ public StagingResult call() {
 }
   }
 
+  /** A file along with a staging name. */
+  @AutoValue
+  public abstract static class StagedFile {
 
 Review comment:
   
   
   > **tgroh** wrote:
   > We create this before staging, right? The name implies after
   
   
   See https://github.com/apache/beam/pull/5150#discussion_r182507804. I'm open 
to suggestions, but otherwise I'll leave it as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92392)
Time Spent: 14h  (was: 13h 50m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 14h
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92391&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92391
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:18
Start Date: 19/Apr/18 04:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182630022
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
+  }
+
+  @VisibleForTesting
+  static PortableRunner createInternal(
+  PipelineOptions options, ManagedChannelFactory channelFactory) {
+PortablePipelineOptions portableOptions =
+PipelineOptionsValidator.validate(PortablePipelineOptions.class, 
options);
+
+String endpoint = portableOptions.getJobEndpoint();
+
+// Deduplicate artifacts.
+Set pathsToStage = Sets.newHashSet();
+if (portableOptions.getFile

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92395&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92395
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:18
Start Date: 19/Apr/18 04:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182630026
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/CloseableResource.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link AutoCloseable} that wraps a resource that needs to be cleaned up 
but does not implement
+ * {@link AutoCloseable} itself.
+ *
+ * Recipients of a {@link CloseableResource} are in general responsible for 
cleanup. Ownership
+ * can be transferred from one context to another via {@link #transfer()}. 
Transferring relinquishes
+ * ownership from the original resource. This allows resources to be safely 
constructed and
+ * transferred within a try-with-resources block. For example:
+ *
+ * {@code try (CloseableResource resource = CloseableResource.of(...)) {
+ *   // Do something with resource.
+ *   ...
+ *   // Then transfer ownership to some consumer.
+ *   resourceConsumer(resource.transfer());
+ * }
+ * }
+ *
+ * Not thread-safe.
+ */
+public class CloseableResource implements AutoCloseable {
+
+  private final T resource;
+
+  /**
+   * {@link Closer } for the underlying resource. Closers are nullable to 
allow transfer of
+   * ownership. However, newly-constructed {@link CloseableResource 
CloseableResources} must always
+   * have non-null closers.
+   */
+  @Nullable private Closer closer;
+
+  private boolean isClosed = false;
+
+  private CloseableResource(T resource, Closer closer) {
+this.resource = resource;
+this.closer = closer;
+  }
+
+  /** Creates a {@link CloseableResource} with the given resource and closer. 
*/
+  public static  CloseableResource of(T resource, Closer closer) {
+checkArgument(resource != null, "Resource must be non-null");
+checkArgument(closer != null, "%s must be non-null", 
Closer.class.getName());
+return new CloseableResource<>(resource, closer);
+  }
+
+  /** Gets the underlying resource. */
+  public T get() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+return resource;
+  }
+
+  /**
+   * Returns a new {@link CloseableResource} that owns the underlying resource 
and relinquishes
+   * ownership from this {@link CloseableResource}. {@link #close()} on the 
original instance
+   * becomes a no-op.
+   */
+  public CloseableResource transfer() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+CloseableResource other = CloseableResource.of(resource, closer);
+this.closer = null;
+return other;
+  }
+
+  /**
+   * Closes the underlying resource. The closer will only be executed on the 
first call.
+   *
+   * @throws CloseException wrapping any exceptions thrown while closing
+   */
+  @Override
+  public void close() throws CloseException {
+if (closer != null && !isClosed) {
+  try {
+closer.close(resource);
+isClosed = true;
+  } catch (Exception e) {
+// Mark resource as closed even if we catch an exception.
+isClosed = true;
 
 Review comment:
   
   
   > **tgroh** wrote:
   > `isClosed` can be in the finally block
   
   
   Done.


This is an automated message from the Apache Git Servi

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92394&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92394
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:18
Start Date: 19/Apr/18 04:18
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182630027
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InMemoryArtifactService.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.BaseEncoding;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.security.MessageDigest;
+import java.util.Set;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A StagingService for tests. Only stores artifact metadata. */
 
 Review comment:
   
   
   > **tgroh** wrote:
   > We already have one of these, don't we?
   
   
   I was unable to find one. Please link it if it does exist.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92394)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 14h 10m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92377&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92377
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629957
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -222,13 +230,26 @@ public StagingResult call() {
 }
   }
 
+  /** A file along with a staging name. */
+  @AutoValue
+  public abstract static class StagedFile {
 
 Review comment:
   
   
   > **tgroh** wrote:
   > We create this before staging, right? The name implies after
   
   
   See https://github.com/apache/beam/pull/5150#discussion_r182507804. I'm open 
to suggestions, but otherwise I'll leave it as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92377)
Time Spent: 12h  (was: 11h 50m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92384&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92384
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629974
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -222,13 +230,26 @@ public StagingResult call() {
 }
   }
 
+  /** A file along with a staging name. */
+  @AutoValue
+  public abstract static class StagedFile {
 
 Review comment:
   
   
   > **tgroh** wrote:
   > We create this before staging, right? The name implies after
   
   
   See https://github.com/apache/beam/pull/5150#discussion_r182507804. I'm open 
to suggestions, but otherwise I'll leave it as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92384)
Time Spent: 12h 50m  (was: 12h 40m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 12h 50m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92382&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92382
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629965
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
 
 Review comment:
   
   
   > **tgroh** wrote:
   > why not just `create`?
   
   
   Mostly to draw attention to the fact that this is an internal call and to 
distinguish it from `fromOptions`. If you have a strong preference, I'll change 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92389&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92389
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629980
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InMemoryArtifactService.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.BaseEncoding;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.security.MessageDigest;
+import java.util.Set;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A StagingService for tests. Only stores artifact metadata. */
 
 Review comment:
   
   
   > **tgroh** wrote:
   > We already have one of these, don't we?
   
   
   I was unable to find one. Please link it if it does exist.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92389)
Time Spent: 13.5h  (was: 13h 20m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 13.5h
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92381&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92381
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629961
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InMemoryArtifactService.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.BaseEncoding;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.security.MessageDigest;
+import java.util.Set;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A StagingService for tests. Only stores artifact metadata. */
 
 Review comment:
   
   
   > **tgroh** wrote:
   > We already have one of these, don't we?
   
   
   I was unable to find one. Please link it if it does exist.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92381)
Time Spent: 12h 20m  (was: 12h 10m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 12h 20m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92378&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92378
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629958
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
+  }
+
+  @VisibleForTesting
+  static PortableRunner createInternal(
+  PipelineOptions options, ManagedChannelFactory channelFactory) {
+PortablePipelineOptions portableOptions =
+PipelineOptionsValidator.validate(PortablePipelineOptions.class, 
options);
+
+String endpoint = portableOptions.getJobEndpoint();
+
+// Deduplicate artifacts.
+Set pathsToStage = Sets.newHashSet();
+if (portableOptions.getFile

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92386&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92386
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629979
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/CloseableResource.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link AutoCloseable} that wraps a resource that needs to be cleaned up 
but does not implement
+ * {@link AutoCloseable} itself.
+ *
+ * Recipients of a {@link CloseableResource} are in general responsible for 
cleanup. Ownership
+ * can be transferred from one context to another via {@link #transfer()}. 
Transferring relinquishes
+ * ownership from the original resource. This allows resources to be safely 
constructed and
+ * transferred within a try-with-resources block. For example:
+ *
+ * {@code try (CloseableResource resource = CloseableResource.of(...)) {
+ *   // Do something with resource.
+ *   ...
+ *   // Then transfer ownership to some consumer.
+ *   resourceConsumer(resource.transfer());
+ * }
+ * }
+ *
+ * Not thread-safe.
+ */
+public class CloseableResource implements AutoCloseable {
+
+  private final T resource;
+
+  /**
+   * {@link Closer } for the underlying resource. Closers are nullable to 
allow transfer of
+   * ownership. However, newly-constructed {@link CloseableResource 
CloseableResources} must always
+   * have non-null closers.
+   */
+  @Nullable private Closer closer;
+
+  private boolean isClosed = false;
+
+  private CloseableResource(T resource, Closer closer) {
+this.resource = resource;
+this.closer = closer;
+  }
+
+  /** Creates a {@link CloseableResource} with the given resource and closer. 
*/
+  public static  CloseableResource of(T resource, Closer closer) {
+checkArgument(resource != null, "Resource must be non-null");
+checkArgument(closer != null, "%s must be non-null", 
Closer.class.getName());
+return new CloseableResource<>(resource, closer);
+  }
+
+  /** Gets the underlying resource. */
+  public T get() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+return resource;
+  }
+
+  /**
+   * Returns a new {@link CloseableResource} that owns the underlying resource 
and relinquishes
+   * ownership from this {@link CloseableResource}. {@link #close()} on the 
original instance
+   * becomes a no-op.
+   */
+  public CloseableResource transfer() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+CloseableResource other = CloseableResource.of(resource, closer);
+this.closer = null;
+return other;
+  }
+
+  /**
+   * Closes the underlying resource. The closer will only be executed on the 
first call.
+   *
+   * @throws CloseException wrapping any exceptions thrown while closing
+   */
+  @Override
+  public void close() throws CloseException {
+if (closer != null && !isClosed) {
+  try {
+closer.close(resource);
+isClosed = true;
+  } catch (Exception e) {
+// Mark resource as closed even if we catch an exception.
+isClosed = true;
 
 Review comment:
   
   
   > **tgroh** wrote:
   > `isClosed` can be in the finally block
   
   
   Done.


This is an automated message from the Apache Git Servi

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92379&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92379
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629959
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -87,26 +87,32 @@ private ArtifactServiceStager(Channel channel, int 
bufferSize) {
 this.bufferSize = bufferSize;
   }
 
-  public void stage(Iterable files) throws IOException, 
InterruptedException {
-final Map> futures = new 
HashMap<>();
-for (File file : files) {
+  /**
+   * Stages the given artifact files to the staging service.
+   *
+   * @return The artifact staging token returned by the service
+   */
+  public String stage(Iterable files) throws IOException, 
InterruptedException {
+final Map> futures = new 
HashMap<>();
+for (StagedFile file : files) {
   futures.put(file, MoreFutures.supplyAsync(new StagingCallable(file), 
executorService));
 }
 CompletionStage stagingResult =
 MoreFutures.allAsList(futures.values())
 .thenApply(ignored -> new 
ExtractStagingResultsCallable(futures).call());
-stageManifest(stagingResult);
+return stageManifest(stagingResult);
   }
 
-  private void stageManifest(CompletionStage stagingFuture)
+  private String stageManifest(CompletionStage stagingFuture)
   throws InterruptedException {
 try {
   StagingResult stagingResult = MoreFutures.get(stagingFuture);
   if (stagingResult.isSuccess()) {
 Manifest manifest =
 
Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
-blockingStub.commitManifest(
-CommitManifestRequest.newBuilder().setManifest(manifest).build());
+return blockingStub
+
.commitManifest(CommitManifestRequest.newBuilder().setManifest(manifest).build())
+.getStagingToken();
 
 Review comment:
   
   
   > **tgroh** wrote:
   > Separate these two lines? Just a preference thing, but I think that the 
fluent style hints at something more immediate than the actual RPC we're making 
here.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92379)
Time Spent: 12h 10m  (was: 12h)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 12h 10m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92387&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92387
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629977
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -87,26 +87,32 @@ private ArtifactServiceStager(Channel channel, int 
bufferSize) {
 this.bufferSize = bufferSize;
   }
 
-  public void stage(Iterable files) throws IOException, 
InterruptedException {
-final Map> futures = new 
HashMap<>();
-for (File file : files) {
+  /**
+   * Stages the given artifact files to the staging service.
+   *
+   * @return The artifact staging token returned by the service
+   */
+  public String stage(Iterable files) throws IOException, 
InterruptedException {
+final Map> futures = new 
HashMap<>();
+for (StagedFile file : files) {
   futures.put(file, MoreFutures.supplyAsync(new StagingCallable(file), 
executorService));
 }
 CompletionStage stagingResult =
 MoreFutures.allAsList(futures.values())
 .thenApply(ignored -> new 
ExtractStagingResultsCallable(futures).call());
-stageManifest(stagingResult);
+return stageManifest(stagingResult);
   }
 
-  private void stageManifest(CompletionStage stagingFuture)
+  private String stageManifest(CompletionStage stagingFuture)
   throws InterruptedException {
 try {
   StagingResult stagingResult = MoreFutures.get(stagingFuture);
   if (stagingResult.isSuccess()) {
 Manifest manifest =
 
Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
-blockingStub.commitManifest(
-CommitManifestRequest.newBuilder().setManifest(manifest).build());
+return blockingStub
+
.commitManifest(CommitManifestRequest.newBuilder().setManifest(manifest).build())
+.getStagingToken();
 
 Review comment:
   
   
   > **tgroh** wrote:
   > Separate these two lines? Just a preference thing, but I think that the 
fluent style hints at something more immediate than the actual RPC we're making 
here.
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92387)
Time Spent: 13h 10m  (was: 13h)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 13h 10m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92383
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629963
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/CloseableResource.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link AutoCloseable} that wraps a resource that needs to be cleaned up 
but does not implement
+ * {@link AutoCloseable} itself.
+ *
+ * Recipients of a {@link CloseableResource} are in general responsible for 
cleanup. Ownership
+ * can be transferred from one context to another via {@link #transfer()}. 
Transferring relinquishes
+ * ownership from the original resource. This allows resources to be safely 
constructed and
+ * transferred within a try-with-resources block. For example:
+ *
+ * {@code try (CloseableResource resource = CloseableResource.of(...)) {
+ *   // Do something with resource.
+ *   ...
+ *   // Then transfer ownership to some consumer.
+ *   resourceConsumer(resource.transfer());
+ * }
+ * }
+ *
+ * Not thread-safe.
+ */
+public class CloseableResource implements AutoCloseable {
+
+  private final T resource;
+
+  /**
+   * {@link Closer } for the underlying resource. Closers are nullable to 
allow transfer of
+   * ownership. However, newly-constructed {@link CloseableResource 
CloseableResources} must always
+   * have non-null closers.
+   */
+  @Nullable private Closer closer;
+
+  private boolean isClosed = false;
+
+  private CloseableResource(T resource, Closer closer) {
+this.resource = resource;
+this.closer = closer;
+  }
+
+  /** Creates a {@link CloseableResource} with the given resource and closer. 
*/
+  public static  CloseableResource of(T resource, Closer closer) {
+checkArgument(resource != null, "Resource must be non-null");
+checkArgument(closer != null, "%s must be non-null", 
Closer.class.getName());
+return new CloseableResource<>(resource, closer);
+  }
+
+  /** Gets the underlying resource. */
+  public T get() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+return resource;
+  }
+
+  /**
+   * Returns a new {@link CloseableResource} that owns the underlying resource 
and relinquishes
+   * ownership from this {@link CloseableResource}. {@link #close()} on the 
original instance
+   * becomes a no-op.
+   */
+  public CloseableResource transfer() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+CloseableResource other = CloseableResource.of(resource, closer);
+this.closer = null;
+return other;
+  }
+
+  /**
+   * Closes the underlying resource. The closer will only be executed on the 
first call.
+   *
+   * @throws CloseException wrapping any exceptions thrown while closing
+   */
+  @Override
+  public void close() throws CloseException {
+if (closer != null && !isClosed) {
+  try {
+closer.close(resource);
+isClosed = true;
+  } catch (Exception e) {
+// Mark resource as closed even if we catch an exception.
+isClosed = true;
 
 Review comment:
   
   
   > **tgroh** wrote:
   > `isClosed` can be in the finally block
   
   
   Done.


This is an automated message from the Apache Git Servi

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92380&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92380
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629960
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.ByteString;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class JobServicePipelineResult implements PipelineResult {
+
+  private static final long POLL_INTERVAL_SEC = 10;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobServicePipelineResult.class);
+
+  private final ByteString jobId;
+  private final CloseableResource jobService;
+
+  JobServicePipelineResult(ByteString jobId, 
CloseableResource jobService) {
+this.jobId = jobId;
+this.jobService = jobService;
+  }
+
+  @Override
+  public State getState() {
+JobServiceBlockingStub stub = jobService.get();
+GetJobStateResponse response =
+
stub.getState(GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State cancel() {
+JobServiceBlockingStub stub = jobService.get();
+CancelJobResponse response =
+
stub.cancel(CancelJobRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+if (duration.compareTo(Duration.millis(1)) < 1) {
+  // Equivalent to infinite timeout.
+  return waitUntilFinish();
+} else {
+  CompletableFuture result = 
CompletableFuture.supplyAsync(this::waitUntilFinish);
+  try {
+return Uninterruptibles.getUninterruptibly(
 
 Review comment:
   
   
   > **tgroh** wrote:
   > Why is this uninterruptible?
   
   
   To ensure that we sleep fully for the amount of time requested. Given that 
we don't use interrupts anywhere in Beam code, we don't expect this to happen 
anyway. Alternatively we could just crash on interrupts (i.e., throw a 
RuntimeException wrapping the interruption).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92380)
Time Spent: 12h 20m  (was: 12h 10m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92390&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92390
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629976
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
+  }
+
+  @VisibleForTesting
+  static PortableRunner createInternal(
+  PipelineOptions options, ManagedChannelFactory channelFactory) {
+PortablePipelineOptions portableOptions =
+PipelineOptionsValidator.validate(PortablePipelineOptions.class, 
options);
+
+String endpoint = portableOptions.getJobEndpoint();
+
+// Deduplicate artifacts.
+Set pathsToStage = Sets.newHashSet();
+if (portableOptions.getFile

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92388&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92388
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629978
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.ByteString;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class JobServicePipelineResult implements PipelineResult {
+
+  private static final long POLL_INTERVAL_SEC = 10;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobServicePipelineResult.class);
+
+  private final ByteString jobId;
+  private final CloseableResource jobService;
+
+  JobServicePipelineResult(ByteString jobId, 
CloseableResource jobService) {
+this.jobId = jobId;
+this.jobService = jobService;
+  }
+
+  @Override
+  public State getState() {
+JobServiceBlockingStub stub = jobService.get();
+GetJobStateResponse response =
+
stub.getState(GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State cancel() {
+JobServiceBlockingStub stub = jobService.get();
+CancelJobResponse response =
+
stub.cancel(CancelJobRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+if (duration.compareTo(Duration.millis(1)) < 1) {
+  // Equivalent to infinite timeout.
+  return waitUntilFinish();
+} else {
+  CompletableFuture result = 
CompletableFuture.supplyAsync(this::waitUntilFinish);
+  try {
+return Uninterruptibles.getUninterruptibly(
 
 Review comment:
   
   
   > **tgroh** wrote:
   > Why is this uninterruptible?
   
   
   To ensure that we sleep fully for the amount of time requested. Given that 
we don't use interrupts anywhere in Beam code, we don't expect this to happen 
anyway. Alternatively we could just crash on interrupts (i.e., throw a 
RuntimeException wrapping the interruption).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92388)
Time Spent: 13h 20m  (was: 13h 10m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92385&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92385
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:17
Start Date: 19/Apr/18 04:17
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182629975
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
 
 Review comment:
   
   
   > **tgroh** wrote:
   > why not just `create`?
   
   
   Mostly to draw attention to the fact that this is an internal call and to 
distinguish it from `fromOptions`. If you have a strong preference, I'll change 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above

[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92375&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92375
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182627994
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Alright, this works. Made a few more changes to ensure 
```ClassNotFoundException``` does not occur during serialization of 
```KafkaRecordCoder```.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92375)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92373&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92373
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182627458
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
 ##
 @@ -66,9 +77,32 @@ public void encode(KafkaRecord value, OutputStream 
outStream) throws IOExc
 longCoder.decode(inStream),
 longCoder.decode(inStream),
 KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
+getHeaders(headerCoder.decode(inStream)),
 kvCoder.decode(inStream));
   }
 
+  private Headers getHeaders(Iterable> records) {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92373)
Time Spent: 6h 40m  (was: 6.5h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92376&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92376
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182628417
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -76,6 +80,10 @@ public long getOffset() {
 return offset;
   }
 
+  public Headers getHeaders() {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92376)
Time Spent: 7h  (was: 6h 50m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92374&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92374
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 04:16
Start Date: 19/Apr/18 04:16
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182629453
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
 ##
 @@ -212,6 +212,7 @@ public boolean advance() {
 rawRecord.offset(),
 consumerSpEL.getRecordTimestamp(rawRecord),
 consumerSpEL.getRecordTimestampType(rawRecord),
+consumerSpEL.getHeaders(rawRecord),
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92374)
Time Spent: 6h 50m  (was: 6h 40m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (BEAM-4109) Support arbitrary artifact names in local file artifact staging service

2018-04-18 Thread Ben Sidhom (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ben Sidhom reassigned BEAM-4109:


Assignee: (was: Kenneth Knowles)

> Support arbitrary artifact names in local file artifact staging service
> ---
>
> Key: BEAM-4109
> URL: https://issues.apache.org/jira/browse/BEAM-4109
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Ben Sidhom
>Priority: Minor
>
> The local-file based artifact staging service implementation stores artifacts 
> in a flat directory under the exact names they are given by the artifact 
> staging request. This assumes that all artifact names are safe file names and 
> requires staging clients to manually escape names.
> Instead, the staging service should perform its own escaping/mapping 
> transparently and allow clients to specify arbitrary artifact staging names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : beam_PostCommit_Python_ValidatesContainer_Dataflow #108

2018-04-18 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : beam_PostCommit_Java_GradleBuild #115

2018-04-18 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : beam_PostCommit_Java_ValidatesRunner_Apex_Gradle #137

2018-04-18 Thread Apache Jenkins Server
See 




[jira] [Work logged] (BEAM-3433) Allow BigQueryIO to use a different project for the load job in batch mode.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3433?focusedWorklogId=92345&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92345
 ]

ASF GitHub Bot logged work on BEAM-3433:


Author: ASF GitHub Bot
Created on: 19/Apr/18 02:06
Start Date: 19/Apr/18 02:06
Worklog Time Spent: 10m 
  Work Description: kvncp opened a new pull request #5178: [BEAM-3433] 
Allow a GCP project to be explicitly set for a load job
URL: https://github.com/apache/beam/pull/5178
 
 
   I've added the needed string parameter to the BigQueryIO.write() function, 
and passed it through to the underlying class. Wanted to get some feedback 
before trying to write a test.
   
   1. Should I also add a ValueProvider interface?
   2. I've modified the constructor for WriteTables, which is public. Should I 
instead add a setter for that function or overload the constructor?
   3. Should I validate the this parameter is not set unless the Method is 
FILE_LOADS? It isn't harmful to set it otherwise, it is just ignored. Not sure 
what the recommendation is in that case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92345)
Time Spent: 10m
Remaining Estimate: 0h

> Allow BigQueryIO to use a different project for the load job in batch mode.
> ---
>
> Key: BEAM-3433
> URL: https://issues.apache.org/jira/browse/BEAM-3433
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Kevin Peterson
>Assignee: Chamikara Jayalath
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> BigQueryIO is currently configured to always run a batch load job using the 
> same projects as the destination table: 
> https://github.com/apache/beam/blob/192b4c70927901860312f8c8acd27bd47e4a4259/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L256
> This may not always be desirable, since a pipeline may have write access to a 
> dataset in a different project, but not jobs.create access in that project. 
> This parameter should be settable in the interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3433) Allow BigQueryIO to use a different project for the load job in batch mode.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3433?focusedWorklogId=92346&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92346
 ]

ASF GitHub Bot logged work on BEAM-3433:


Author: ASF GitHub Bot
Created on: 19/Apr/18 02:06
Start Date: 19/Apr/18 02:06
Worklog Time Spent: 10m 
  Work Description: kvncp commented on issue #5178: [BEAM-3433] Allow a GCP 
project to be explicitly set for a load job
URL: https://github.com/apache/beam/pull/5178#issuecomment-382586056
 
 
   Hi @chamikaramj , can you please take a look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92346)
Time Spent: 20m  (was: 10m)

> Allow BigQueryIO to use a different project for the load job in batch mode.
> ---
>
> Key: BEAM-3433
> URL: https://issues.apache.org/jira/browse/BEAM-3433
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Kevin Peterson
>Assignee: Chamikara Jayalath
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> BigQueryIO is currently configured to always run a batch load job using the 
> same projects as the destination table: 
> https://github.com/apache/beam/blob/192b4c70927901860312f8c8acd27bd47e4a4259/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L256
> This may not always be desirable, since a pipeline may have write access to a 
> dataset in a different project, but not jobs.create access in that project. 
> This parameter should be settable in the interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92343&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92343
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:55
Start Date: 19/Apr/18 01:55
Worklog Time Spent: 10m 
  Work Description: rangadi commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182616263
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   hmm.. may be that is one of the reasons  `ConsumerRecord.headers()` returns 
mutable headers. Could we just use that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92343)
Time Spent: 6.5h  (was: 6h 20m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : beam_PostCommit_Java_ValidatesRunner_Flink_Gradle #155

2018-04-18 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PerformanceTests_Spark #1607

2018-04-18 Thread Apache Jenkins Server
See 


Changes:

[coheigea] Change equals() + toUpper/LowerCase to equalsIgnoreCase

[yifanzou] BEAM-3339 Mobile gaming automation for Java nightly snapshot

[sidhom] [BEAM-4069] Gracefully deserialize empty options structs

[iemejia] [BEAM-4019] Refactor HBaseIO splitting to produce ByteKeyRange objects

[ehudm] Fix localfilesystem _list() operation.

[aaltay] [BEAM-4108] Generate javadocs for release (#5121)

--
[...truncated 95.33 KB...]
'apache-beam-testing:bqjob_r2b29942538179a23_0162db90240a_1': Invalid schema
update. Field timestamp has changed type from TIMESTAMP to FLOAT

STDERR: 
/usr/lib/google-cloud-sdk/platform/bq/third_party/oauth2client/contrib/gce.py:73:
 UserWarning: You have requested explicit scopes to be used with a GCE service 
account.
Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.

  warnings.warn(_SCOPES_WARNING)
Upload complete.Waiting on bqjob_r2b29942538179a23_0162db90240a_1 ... (0s) 
Current status: RUNNING 
 Waiting on 
bqjob_r2b29942538179a23_0162db90240a_1 ... (0s) Current status: DONE   
2018-04-19 01:41:21,305 8ba9ddda MainThread INFO Retrying exception running 
IssueRetryableCommand: Command returned a non-zero exit code.

2018-04-19 01:41:45,408 8ba9ddda MainThread INFO Running: bq load 
--autodetect --source_format=NEWLINE_DELIMITED_JSON 
beam_performance.pkb_results 

2018-04-19 01:41:47,580 8ba9ddda MainThread INFO Ran: {bq load --autodetect 
--source_format=NEWLINE_DELIMITED_JSON beam_performance.pkb_results 

  ReturnCode:1
STDOUT: 

BigQuery error in load operation: Error processing job
'apache-beam-testing:bqjob_rb0add325d4abb8e_0162db908a74_1': Invalid schema
update. Field timestamp has changed type from TIMESTAMP to FLOAT

STDERR: 
/usr/lib/google-cloud-sdk/platform/bq/third_party/oauth2client/contrib/gce.py:73:
 UserWarning: You have requested explicit scopes to be used with a GCE service 
account.
Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.

  warnings.warn(_SCOPES_WARNING)
Upload complete.Waiting on bqjob_rb0add325d4abb8e_0162db908a74_1 ... (0s) 
Current status: RUNNING 
Waiting on 
bqjob_rb0add325d4abb8e_0162db908a74_1 ... (0s) Current status: DONE   
2018-04-19 01:41:47,581 8ba9ddda MainThread INFO Retrying exception running 
IssueRetryableCommand: Command returned a non-zero exit code.

2018-04-19 01:42:13,762 8ba9ddda MainThread INFO Running: bq load 
--autodetect --source_format=NEWLINE_DELIMITED_JSON 
beam_performance.pkb_results 

2018-04-19 01:42:15,934 8ba9ddda MainThread INFO Ran: {bq load --autodetect 
--source_format=NEWLINE_DELIMITED_JSON beam_performance.pkb_results 

  ReturnCode:1
STDOUT: 

BigQuery error in load operation: Error processing job
'apache-beam-testing:bqjob_r6693e0ffd0df8c73_0162db90f950_1': Invalid schema
update. Field timestamp has changed type from TIMESTAMP to FLOAT

STDERR: 
/usr/lib/google-cloud-sdk/platform/bq/third_party/oauth2client/contrib/gce.py:73:
 UserWarning: You have requested explicit scopes to be used with a GCE service 
account.
Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.

  warnings.warn(_SCOPES_WARNING)
Upload complete.Waiting on bqjob_r6693e0ffd0df8c73_0162db90f950_1 ... (0s) 
Current status: RUNNING 
 Waiting on 
bqjob_r6693e0ffd0df8c73_0162db90f950_1 ... (0s) Current status: DONE   
2018-04-19 01:42:15,935 8ba9ddda MainThread INFO Retrying exception running 
IssueRetryableCommand: Command returned a non-zero exit code.

2018-04-19 01:42:41,407 8ba9ddda MainThread INFO Running: bq load 
--autodetect --source_format=NEWLINE_DELIMITED_JSON 
beam_performance.pkb_results 

2018-04-19 01:42:43,548 8ba9ddda MainThread INFO Ran: {bq load --autodetect 
--source_format=NEWLINE_DELIMITED_JS

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92331
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182611526
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -87,26 +87,32 @@ private ArtifactServiceStager(Channel channel, int 
bufferSize) {
 this.bufferSize = bufferSize;
   }
 
-  public void stage(Iterable files) throws IOException, 
InterruptedException {
-final Map> futures = new 
HashMap<>();
-for (File file : files) {
+  /**
+   * Stages the given artifact files to the staging service.
+   *
+   * @return The artifact staging token returned by the service
+   */
+  public String stage(Iterable files) throws IOException, 
InterruptedException {
+final Map> futures = new 
HashMap<>();
+for (StagedFile file : files) {
   futures.put(file, MoreFutures.supplyAsync(new StagingCallable(file), 
executorService));
 }
 CompletionStage stagingResult =
 MoreFutures.allAsList(futures.values())
 .thenApply(ignored -> new 
ExtractStagingResultsCallable(futures).call());
-stageManifest(stagingResult);
+return stageManifest(stagingResult);
   }
 
-  private void stageManifest(CompletionStage stagingFuture)
+  private String stageManifest(CompletionStage stagingFuture)
   throws InterruptedException {
 try {
   StagingResult stagingResult = MoreFutures.get(stagingFuture);
   if (stagingResult.isSuccess()) {
 Manifest manifest =
 
Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
-blockingStub.commitManifest(
-CommitManifestRequest.newBuilder().setManifest(manifest).build());
+return blockingStub
+
.commitManifest(CommitManifestRequest.newBuilder().setManifest(manifest).build())
+.getStagingToken();
 
 Review comment:
   Separate these two lines? Just a preference thing, but I think that the 
fluent style hints at something more immediate than the actual RPC we're making 
here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92331)
Time Spent: 11h  (was: 10h 50m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 11h
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92338&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92338
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182611652
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -222,13 +225,26 @@ public StagingResult call() {
 }
   }
 
+  /** A file along with a staging name. */
+  @AutoValue
+  public abstract static class FileToStage {
+public static FileToStage of(File file, String stageName) {
+  return new AutoValue_ArtifactServiceStager_FileToStage(file, stageName);
+}
+
+/** The file to stage. */
+public abstract File getFile();
+/** Staging handle to this file. */
+public abstract String getStageName();
 
 Review comment:
   "StagingName"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92338)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92334&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92334
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182613993
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
+  }
+
+  @VisibleForTesting
+  static PortableRunner createInternal(
+  PipelineOptions options, ManagedChannelFactory channelFactory) {
+PortablePipelineOptions portableOptions =
+PipelineOptionsValidator.validate(PortablePipelineOptions.class, 
options);
+
+String endpoint = portableOptions.getJobEndpoint();
+
+// Deduplicate artifacts.
+Set pathsToStage = Sets.newHashSet();
+if (portableOptions.getFilesT

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92333&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92333
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182613920
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.FileToStage;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
 
 Review comment:
   They do, but always with the same options. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92333)
Time Spent: 11h 20m  (was: 11h 10m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indic

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92336&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92336
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182613945
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.StagedFile;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.reference.CloseableResource.CloseException;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.util.ZipFiles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+return createInternal(options, ManagedChannelFactory.createDefault());
 
 Review comment:
   why not just `create`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92336)
Ti

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92332&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92332
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182613729
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/CloseableResource.java
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link AutoCloseable} that wraps a resource that needs to be cleaned up 
but does not implement
+ * {@link AutoCloseable} itself.
+ *
+ * Recipients of a {@link CloseableResource} are in general responsible for 
cleanup. Ownership
+ * can be transferred from one context to another via {@link #transfer()}. 
Transferring relinquishes
+ * ownership from the original resource. This allows resources to be safely 
constructed and
+ * transferred within a try-with-resources block. For example:
+ *
+ * {@code try (CloseableResource resource = CloseableResource.of(...)) {
+ *   // Do something with resource.
+ *   ...
+ *   // Then transfer ownership to some consumer.
+ *   resourceConsumer(resource.transfer());
+ * }
+ * }
+ *
+ * Not thread-safe.
+ */
+public class CloseableResource implements AutoCloseable {
+
+  private final T resource;
+
+  /**
+   * {@link Closer } for the underlying resource. Closers are nullable to 
allow transfer of
+   * ownership. However, newly-constructed {@link CloseableResource 
CloseableResources} must always
+   * have non-null closers.
+   */
+  @Nullable private Closer closer;
+
+  private boolean isClosed = false;
+
+  private CloseableResource(T resource, Closer closer) {
+this.resource = resource;
+this.closer = closer;
+  }
+
+  /** Creates a {@link CloseableResource} with the given resource and closer. 
*/
+  public static  CloseableResource of(T resource, Closer closer) {
+checkArgument(resource != null, "Resource must be non-null");
+checkArgument(closer != null, "%s must be non-null", 
Closer.class.getName());
+return new CloseableResource<>(resource, closer);
+  }
+
+  /** Gets the underlying resource. */
+  public T get() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+return resource;
+  }
+
+  /**
+   * Returns a new {@link CloseableResource} that owns the underlying resource 
and relinquishes
+   * ownership from this {@link CloseableResource}. {@link #close()} on the 
original instance
+   * becomes a no-op.
+   */
+  public CloseableResource transfer() {
+checkState(closer != null, "%s has transferred ownership", 
CloseableResource.class.getName());
+checkState(!isClosed, "% is closed", CloseableResource.class.getName());
+CloseableResource other = CloseableResource.of(resource, closer);
+this.closer = null;
+return other;
+  }
+
+  /**
+   * Closes the underlying resource. The closer will only be executed on the 
first call.
+   *
+   * @throws CloseException wrapping any exceptions thrown while closing
+   */
+  @Override
+  public void close() throws CloseException {
+if (closer != null && !isClosed) {
+  try {
+closer.close(resource);
+isClosed = true;
+  } catch (Exception e) {
+// Mark resource as closed even if we catch an exception.
+isClosed = true;
 
 Review comment:
   `isClosed` can be in the finally block


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92337&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92337
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182611630
 
 

 ##
 File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java
 ##
 @@ -222,13 +230,26 @@ public StagingResult call() {
 }
   }
 
+  /** A file along with a staging name. */
+  @AutoValue
+  public abstract static class StagedFile {
 
 Review comment:
   We create this before staging, right? The name implies after


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92337)
Time Spent: 11h 40m  (was: 11.5h)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 11h 40m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92339&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92339
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182613813
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
 ##
 @@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.google.protobuf.ByteString;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.jobmanagement.v1.JobApi;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class JobServicePipelineResult implements PipelineResult {
+
+  private static final long POLL_INTERVAL_SEC = 10;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(JobServicePipelineResult.class);
+
+  private final ByteString jobId;
+  private final CloseableResource jobService;
+
+  JobServicePipelineResult(ByteString jobId, 
CloseableResource jobService) {
+this.jobId = jobId;
+this.jobService = jobService;
+  }
+
+  @Override
+  public State getState() {
+JobServiceBlockingStub stub = jobService.get();
+GetJobStateResponse response =
+
stub.getState(GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State cancel() {
+JobServiceBlockingStub stub = jobService.get();
+CancelJobResponse response =
+
stub.cancel(CancelJobRequest.newBuilder().setJobIdBytes(jobId).build());
+return getJavaState(response.getState());
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+if (duration.compareTo(Duration.millis(1)) < 1) {
+  // Equivalent to infinite timeout.
+  return waitUntilFinish();
+} else {
+  CompletableFuture result = 
CompletableFuture.supplyAsync(this::waitUntilFinish);
+  try {
+return Uninterruptibles.getUninterruptibly(
 
 Review comment:
   Why is this uninterruptible?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92339)
Time Spent: 11h 50m  (was: 11h 40m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 11h 50m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job AP

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92335
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:34
Start Date: 19/Apr/18 01:34
Worklog Time Spent: 10m 
  Work Description: tgroh commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182614126
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InMemoryArtifactService.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.BaseEncoding;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.security.MessageDigest;
+import java.util.Set;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A StagingService for tests. Only stores artifact metadata. */
 
 Review comment:
   We already have one of these, don't we?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92335)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PerformanceTests_MongoDBIO_IT #67

2018-04-18 Thread Apache Jenkins Server
See 


Changes:

[coheigea] Change equals() + toUpper/LowerCase to equalsIgnoreCase

[yifanzou] BEAM-3339 Mobile gaming automation for Java nightly snapshot

[sidhom] [BEAM-4069] Gracefully deserialize empty options structs

[iemejia] [BEAM-4019] Refactor HBaseIO splitting to produce ByteKeyRange objects

[ehudm] Fix localfilesystem _list() operation.

[aaltay] [BEAM-4108] Generate javadocs for release (#5121)

--
[...truncated 123.66 KB...]
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mongodb.MongoTimeoutException: Timed out after 3 ms while 
waiting for a server that matches 
ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster 
state is {type=UNKNOWN, servers=[{address=35.226.153.218:27017, type=UNKNOWN, 
state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception 
opening socket}, caused by {java.net.SocketTimeoutException: connect timed 
out}}]
at 
com.mongodb.connection.BaseCluster.createTimeoutException(BaseCluster.java:369)
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:101)
at 
com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:75)
at 
com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:71)
at 
com.mongodb.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:63)
at 
com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:210)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:482)
at com.mongodb.operation.FindOperation.execute(FindOperation.java:79)
at com.mongodb.Mongo.execute(Mongo.java:772)
at com.mongodb.Mongo$2.execute(Mongo.java:759)
at com.mongodb.OperationIterable.iterator(OperationIterable.java:47)
at com.mongodb.FindIterableImpl.iterator(FindIterableImpl.java:143)
at 
org.apache.beam.sdk.io.mongodb.MongoDbIO$BoundedMongoDbReader.start(MongoDbIO.java:456)
at 
com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:592)
... 14 more
java.io.IOException: Failed to start reading from source: 
org.apache.beam.sdk.io.mongodb.MongoDbIO$BoundedMongoDbSource@16e9f02d
at 
com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:595)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
at 
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
at 
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:383)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:355)
at 
com.google.cloud.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:286)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:134)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:114)
at 
com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:101)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.mongodb.MongoTimeoutException: Timed out after 3 ms while 
waiting for a server that matches 
ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster 
state is {type=UNKNOWN, servers=[{address=35.226.153.218:27017, type=UNKNOWN, 
state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception 
opening socket}, caused by {java.net.SocketTimeoutException: connect timed 
out}}]
at 
com.mongodb.connection.BaseCluster.createTimeoutException(BaseCluster.java:369)
at com.mongodb.connection.BaseCluster.selectServer(BaseCluster.java:101)
at 
com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:75)
at 
com.mongodb.binding.ClusterBinding$ClusterBindingConnectionSource.(ClusterBinding.java:71)
at 
com.mongodb.binding.ClusterBinding.getReadConnectionSource(ClusterBinding.java:63)
at 
com.mongodb.operation.OperationHelper.withConnection(Operation

Jenkins build is back to normal : beam_PerformanceTests_JDBC #469

2018-04-18 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PerformanceTests_Python #1165

2018-04-18 Thread Apache Jenkins Server
See 


Changes:

[coheigea] Change equals() + toUpper/LowerCase to equalsIgnoreCase

[yifanzou] BEAM-3339 Mobile gaming automation for Java nightly snapshot

[sidhom] [BEAM-4069] Gracefully deserialize empty options structs

[iemejia] [BEAM-4019] Refactor HBaseIO splitting to produce ByteKeyRange objects

[ehudm] Fix localfilesystem _list() operation.

[aaltay] [BEAM-4108] Generate javadocs for release (#5121)

--
[...truncated 30.63 KB...]
[INFO] --- maven-compiler-plugin:3.7.0:testCompile (default-testCompile) @ 
beam-sdks-java-build-tools ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to 

[INFO] 
[INFO] --- maven-checkstyle-plugin:3.0.0:check (default) @ 
beam-sdks-java-build-tools ---
[INFO] 
[INFO] --- maven-surefire-plugin:2.21.0:test (default-test) @ 
beam-sdks-java-build-tools ---
[INFO] Tests are skipped.
[INFO] 
[INFO] --- build-helper-maven-plugin:3.0.0:regex-properties 
(render-artifact-id) @ beam-sdks-java-build-tools ---
[INFO] 
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ 
beam-sdks-java-build-tools ---
[INFO] Building jar: 

[INFO] 
[INFO] --- maven-site-plugin:3.7:attach-descriptor (attach-descriptor) @ 
beam-sdks-java-build-tools ---
[INFO] Skipping because packaging 'jar' is not pom.
[INFO] 
[INFO] --- maven-jar-plugin:3.0.2:test-jar (default-test-jar) @ 
beam-sdks-java-build-tools ---
[INFO] Building jar: 

[INFO] 
[INFO] --- maven-shade-plugin:3.1.0:shade (bundle-and-repackage) @ 
beam-sdks-java-build-tools ---
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing 

 with 

[INFO] Replacing original test artifact with shaded test artifact.
[INFO] Replacing 

 with 

[INFO] 
[INFO] --- maven-dependency-plugin:3.0.2:analyze-only (default) @ 
beam-sdks-java-build-tools ---
[INFO] No dependency problems found
[INFO] 
[INFO] --- maven-install-plugin:2.5.2:install (default-install) @ 
beam-sdks-java-build-tools ---
[INFO] Installing 

 to 
/home/jenkins/.m2/repository/org/apache/beam/beam-sdks-java-build-tools/2.5.0-SNAPSHOT/beam-sdks-java-build-tools-2.5.0-SNAPSHOT.jar
[INFO] Installing 

 to 
/home/jenkins/.m2/repository/org/apache/beam/beam-sdks-java-build-tools/2.5.0-SNAPSHOT/beam-sdks-java-build-tools-2.5.0-SNAPSHOT.pom
[INFO] 
[INFO] Reactor Summary:
[INFO] 
[INFO] Apache Beam :: Parent .. SUCCESS [  6.570 s]
[INFO] Apache Beam :: SDKs :: Java :: Build Tools . FAILURE [  3.882 s]
[INFO] Apache Beam :: Model ... SKIPPED
[INFO] Apache Beam :: Model :: Pipeline ... SKIPPED
[INFO] Apache Beam :: Model :: Job Management . SKIPPED
[INFO] Apache Beam :: Model :: Fn Execution ... SKIPPED
[INFO] Apache Beam :: SDKs  SKIPPED
[INFO] Apache Beam :: SDKs :: Go .. SKIPPED
[INFO] Apache Beam :: SDKs :: Go :: Container . SKIPPED
[INFO] Apache Beam :: SDKs :: Java  SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Core  SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Fn Execution  SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Extensions .. SKIPPED
[INFO] Apache Beam :: SDKs :: Java :: Extensions :: Google Cloud Platform Core 
SKIPPED
[INFO] Apache Beam :: Runners . SKIPPED
[INFO] Apache Beam :: Runners :: Core Construction Java ... SKIPPED
[INFO] 

Build failed in Jenkins: beam_PerformanceTests_HadoopInputFormat #158

2018-04-18 Thread Apache Jenkins Server
See 


Changes:

[coheigea] Change equals() + toUpper/LowerCase to equalsIgnoreCase

[yifanzou] BEAM-3339 Mobile gaming automation for Java nightly snapshot

[sidhom] [BEAM-4069] Gracefully deserialize empty options structs

[iemejia] [BEAM-4019] Refactor HBaseIO splitting to produce ByteKeyRange objects

[ehudm] Fix localfilesystem _list() operation.

[aaltay] [BEAM-4108] Generate javadocs for release (#5121)

--
[...truncated 63.84 KB...]
[INFO] Excluding com.google.cloud.bigdataoss:gcsio:jar:1.4.5 from the shaded 
jar.
[INFO] Excluding 
com.google.apis:google-api-services-cloudresourcemanager:jar:v1-rev6-1.22.0 
from the shaded jar.
[INFO] Excluding 
org.apache.beam:beam-sdks-java-io-google-cloud-platform:jar:2.5.0-SNAPSHOT from 
the shaded jar.
[INFO] Excluding 
org.apache.beam:beam-sdks-java-extensions-protobuf:jar:2.5.0-SNAPSHOT from the 
shaded jar.
[INFO] Excluding io.grpc:grpc-core:jar:1.2.0 from the shaded jar.
[INFO] Excluding com.google.errorprone:error_prone_annotations:jar:2.0.15 from 
the shaded jar.
[INFO] Excluding io.grpc:grpc-context:jar:1.2.0 from the shaded jar.
[INFO] Excluding com.google.instrumentation:instrumentation-api:jar:0.3.0 from 
the shaded jar.
[INFO] Excluding 
com.google.apis:google-api-services-bigquery:jar:v2-rev374-1.22.0 from the 
shaded jar.
[INFO] Excluding com.google.api:gax-grpc:jar:0.20.0 from the shaded jar.
[INFO] Excluding io.grpc:grpc-protobuf:jar:1.2.0 from the shaded jar.
[INFO] Excluding com.google.api:api-common:jar:1.0.0-rc2 from the shaded jar.
[INFO] Excluding com.google.api:gax:jar:1.3.1 from the shaded jar.
[INFO] Excluding org.threeten:threetenbp:jar:1.3.3 from the shaded jar.
[INFO] Excluding com.google.cloud:google-cloud-core-grpc:jar:1.2.0 from the 
shaded jar.
[INFO] Excluding com.google.apis:google-api-services-pubsub:jar:v1-rev10-1.22.0 
from the shaded jar.
[INFO] Excluding com.google.api.grpc:grpc-google-cloud-pubsub-v1:jar:0.1.18 
from the shaded jar.
[INFO] Excluding com.google.api.grpc:proto-google-cloud-pubsub-v1:jar:0.1.18 
from the shaded jar.
[INFO] Excluding com.google.api.grpc:proto-google-iam-v1:jar:0.1.18 from the 
shaded jar.
[INFO] Excluding com.google.cloud.datastore:datastore-v1-proto-client:jar:1.4.0 
from the shaded jar.
[INFO] Excluding com.google.http-client:google-http-client-protobuf:jar:1.22.0 
from the shaded jar.
[INFO] Excluding com.google.http-client:google-http-client-jackson:jar:1.22.0 
from the shaded jar.
[INFO] Excluding com.google.cloud.datastore:datastore-v1-protos:jar:1.3.0 from 
the shaded jar.
[INFO] Excluding com.google.api.grpc:grpc-google-common-protos:jar:0.1.9 from 
the shaded jar.
[INFO] Excluding io.grpc:grpc-auth:jar:1.2.0 from the shaded jar.
[INFO] Excluding io.grpc:grpc-netty:jar:1.2.0 from the shaded jar.
[INFO] Excluding io.netty:netty-codec-http2:jar:4.1.8.Final from the shaded jar.
[INFO] Excluding io.netty:netty-handler-proxy:jar:4.1.8.Final from the shaded 
jar.
[INFO] Excluding io.netty:netty-codec-socks:jar:4.1.8.Final from the shaded jar.
[INFO] Excluding io.grpc:grpc-stub:jar:1.2.0 from the shaded jar.
[INFO] Excluding com.google.cloud:google-cloud-core:jar:1.0.2 from the shaded 
jar.
[INFO] Excluding org.json:json:jar:20160810 from the shaded jar.
[INFO] Excluding com.google.cloud:google-cloud-spanner:jar:0.20.0b-beta from 
the shaded jar.
[INFO] Excluding com.google.api.grpc:proto-google-cloud-spanner-v1:jar:0.1.11b 
from the shaded jar.
[INFO] Excluding 
com.google.api.grpc:proto-google-cloud-spanner-admin-instance-v1:jar:0.1.11 
from the shaded jar.
[INFO] Excluding com.google.api.grpc:grpc-google-cloud-spanner-v1:jar:0.1.11b 
from the shaded jar.
[INFO] Excluding 
com.google.api.grpc:grpc-google-cloud-spanner-admin-database-v1:jar:0.1.11 from 
the shaded jar.
[INFO] Excluding 
com.google.api.grpc:grpc-google-cloud-spanner-admin-instance-v1:jar:0.1.11 from 
the shaded jar.
[INFO] Excluding com.google.api.grpc:grpc-google-longrunning-v1:jar:0.1.11 from 
the shaded jar.
[INFO] Excluding com.google.api.grpc:proto-google-longrunning-v1:jar:0.1.11 
from the shaded jar.
[INFO] Excluding com.google.cloud.bigtable:bigtable-protos:jar:1.0.0-pre3 from 
the shaded jar.
[INFO] Excluding com.google.cloud.bigtable:bigtable-client-core:jar:1.0.0 from 
the shaded jar.
[INFO] Excluding com.google.auth:google-auth-library-appengine:jar:0.7.0 from 
the shaded jar.
[INFO] Excluding io.opencensus:opencensus-contrib-grpc-util:jar:0.7.0 from the 
shaded jar.
[INFO] Excluding io.opencensus:opencensus-api:jar:0.7.0 from the shaded jar.
[INFO] Excluding 
com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1:jar:0.1.9 from 
the shaded jar.
[INFO] Excluding com.google.api.grpc:proto-google-common-protos:jar:0.1.9 from 
the shaded jar.
[INFO] Excluding io.grpc:grpc-all:jar:1.2.0 from the shaded jar.
[INFO] Excluding io.grpc:grpc-okhttp:jar:1.2.0 from the sh

[jira] [Work logged] (BEAM-4135) Remove Use of Java SDK Types in the DirectRunner "engine"

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4135?focusedWorklogId=92328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92328
 ]

ASF GitHub Bot logged work on BEAM-4135:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:16
Start Date: 19/Apr/18 01:16
Worklog Time Spent: 10m 
  Work Description: tgroh opened a new pull request #5177: [BEAM-4135] Stop 
taking the whole result in WatermarkManager
URL: https://github.com/apache/beam/pull/5177
 
 
   Instead, take in the exploded components which are relevant to the 
WatermarkManager.
   
   
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
- [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
- [ ] Write a pull request description that is detailed enough to 
understand:
  - [ ] What the pull request does
  - [ ] Why it does it
  - [ ] How it does it
  - [ ] Why this approach
- [ ] Each commit in the pull request should have a meaningful subject line 
and body.
- [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92328)
Time Spent: 10m
Remaining Estimate: 0h

> Remove Use of Java SDK Types in the DirectRunner "engine"
> -
>
> Key: BEAM-4135
> URL: https://issues.apache.org/jira/browse/BEAM-4135
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-direct
>Reporter: Thomas Groh
>Assignee: Thomas Groh
>Priority: Major
>  Labels: portability
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The "engine" consists of the components which determine where to schedule 
> work and route it to the appropriate processors, such as WatermarkManager, 
> DirectBundleProcessor, and associated.
>  
> These engine components never inspect the actual characteristics of the 
> packaged work (e.g. the PCollection is a token, rather than a rich object), 
> so they should not require use of a PCollection directly - instead, they can 
> be generic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4135) Remove Use of Java SDK Types in the DirectRunner "engine"

2018-04-18 Thread Thomas Groh (JIRA)
Thomas Groh created BEAM-4135:
-

 Summary: Remove Use of Java SDK Types in the DirectRunner "engine"
 Key: BEAM-4135
 URL: https://issues.apache.org/jira/browse/BEAM-4135
 Project: Beam
  Issue Type: New Feature
  Components: runner-direct
Reporter: Thomas Groh
Assignee: Thomas Groh


The "engine" consists of the components which determine where to schedule work 
and route it to the appropriate processors, such as WatermarkManager, 
DirectBundleProcessor, and associated.

 

These engine components never inspect the actual characteristics of the 
packaged work (e.g. the PCollection is a token, rather than a rich object), so 
they should not require use of a PCollection directly - instead, they can be 
generic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92326&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92326
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:15
Start Date: 19/Apr/18 01:15
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182611605
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Thank you for sharing this. Yes, this works as expected, but the issue comes 
when we initialize Headers in the if branch like so (similar to my example):
   
   ```java
   if (isPresent && kafkaHeaders == null) {
   Headers headers = new RecordHeaders();
   headers.add("headerKey", "headerVal".getBytes());
   return new ConsumerRecord<>("test", 0, 0L, "key", "value").headers();
   }
   ```
   Even with is.present set to False, this throws a 
```ClassNotFoundException``` exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92326)
Time Spent: 6h 20m  (was: 6h 10m)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92325&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92325
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:13
Start Date: 19/Apr/18 01:13
Worklog Time Spent: 10m 
  Work Description: udim commented on issue #5158: [BEAM-4062] Fix 
performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#issuecomment-382577718
 
 
   Also note that this fixes a JIRA 2.5.0 release blocker.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92325)
Time Spent: 40m  (was: 0.5h)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4062) Performance regression in FileBasedSink

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4062?focusedWorklogId=92323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92323
 ]

ASF GitHub Bot logged work on BEAM-4062:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:13
Start Date: 19/Apr/18 01:13
Worklog Time Spent: 10m 
  Work Description: udim commented on issue #5158: [BEAM-4062] Fix 
performance regression in FileBasedSink.
URL: https://github.com/apache/beam/pull/5158#issuecomment-382577651
 
 
   Rebased to hopefully fix pythonPreCommit.
   R: @chamikaramj 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92323)
Time Spent: 0.5h  (was: 20m)

> Performance regression in FileBasedSink
> ---
>
> Key: BEAM-4062
> URL: https://issues.apache.org/jira/browse/BEAM-4062
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Udi Meiri
>Assignee: Udi Meiri
>Priority: Blocker
> Fix For: 2.5.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> [https://github.com/apache/beam/pull/4648] has added:
>  * 3 or more stat() calls per output file (in pre_finalize and 
> finalize_writes)
>  * serial unbatched delete()s (in pre_finalize)
> Solution will be to list files in a batch operation (match()), and to 
> delete() in batch mode, or use multiple threads if that's not possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4038) Support Kafka Headers in KafkaIO

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4038?focusedWorklogId=92322&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92322
 ]

ASF GitHub Bot logged work on BEAM-4038:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:12
Start Date: 19/Apr/18 01:12
Worklog Time Spent: 10m 
  Work Description: gkumar7 commented on a change in pull request #5111: 
[BEAM-4038] Support Kafka Headers in KafkaIO
URL: https://github.com/apache/beam/pull/5111#discussion_r182611605
 
 

 ##
 File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
 ##
 @@ -33,6 +34,7 @@
   private final String topic;
   private final int partition;
   private final long offset;
+  private final Headers headers;
 
 Review comment:
   Thank you for sharing this. Yes, this works as expected, but the issue comes 
when we initialize Headers in the if branch like so (similar to my example):
   
   ```java
   if (isPresent && kafkaHeaders == null) {
   Headers headers = new RecordHeaders();
   headers.add("headerKey", "headerVal".getBytes());
   return new ConsumerRecord<>("test", 0, 0L, "key", "value").headers();
   }
   ```
   Even with is.present set to False, this throws a runtime exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92322)
Time Spent: 6h 10m  (was: 6h)

> Support Kafka Headers in KafkaIO
> 
>
> Key: BEAM-4038
> URL: https://issues.apache.org/jira/browse/BEAM-4038
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-kafka
>Reporter: Geet Kumar
>Assignee: Raghu Angadi
>Priority: Minor
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Headers have been added to Kafka Consumer/Producer records (KAFKA-4208). The 
> purpose of this JIRA is to support this feature in KafkaIO.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : beam_PostCommit_Python_Verify #4735

2018-04-18 Thread Apache Jenkins Server
See 




[jira] [Work logged] (BEAM-3339) Create post-release testing of the nightly snapshots

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3339?focusedWorklogId=92321&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92321
 ]

ASF GitHub Bot logged work on BEAM-3339:


Author: ASF GitHub Bot
Created on: 19/Apr/18 01:04
Start Date: 19/Apr/18 01:04
Worklog Time Spent: 10m 
  Work Description: tgroh closed pull request #4788: [BEAM-3339] Mobile 
gaming automation for Java nightly snapshot on core runners
URL: https://github.com/apache/beam/pull/4788
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.test-infra/jenkins/job_beam_PostRelease_NightlySnapshot.groovy 
b/.test-infra/jenkins/job_beam_PostRelease_NightlySnapshot.groovy
index 59bc243680d..29c7ae11eff 100644
--- a/.test-infra/jenkins/job_beam_PostRelease_NightlySnapshot.groovy
+++ b/.test-infra/jenkins/job_beam_PostRelease_NightlySnapshot.groovy
@@ -48,14 +48,14 @@ job('beam_PostRelease_NightlySnapshot') {
   // Allows triggering this build against pull requests.
   common_job_properties.enablePhraseTriggeringFromPullRequest(
   delegate,
-  './gradlew :release:runQuickstartsJava',
+  './gradlew :release:runJavaExamplesValidationTask',
   'Run Dataflow PostRelease')
 
   steps {
 // Run a quickstart from 
https://beam.apache.org/get-started/quickstart-java
 gradle {
   rootBuildScriptDir(common_job_properties.checkoutDir)
-  tasks(':release:runQuickstartsJava')
+  tasks(':release:runJavaExamplesValidationTask')
   common_job_properties.setGradleSwitches(delegate)
   switches('-Pver=$snapshot_version -Prepourl=$snapshot_url')
 }
diff --git a/build_rules.gradle b/build_rules.gradle
index 8f54fe0f478..cd8c162561e 100644
--- a/build_rules.gradle
+++ b/build_rules.gradle
@@ -977,26 +977,35 @@ ext.applyAvroNature = {
   apply plugin: "com.commercehub.gradle.plugin.avro"
 }
 
-// A class defining the set of configurable properties for 
createJavaQuickstartValidationTask
-class JavaQuickstartConfiguration {
-  // Name for the quickstart is required.
-  // Used both for the test name runQuickstartJava${name}
-  // and also for the script name, quickstart-java-${name}.toLowerCase().
-  String name
-
-  // gcpProject sets the gcpProject argument when executing the quickstart.
+// A class defining the set of configurable properties for 
createJavaExamplesArchetypeValidationTask
+class JavaExamplesArchetypeValidationConfiguration {
+  // Type [Quickstart, MobileGaming] for the postrelease validation is 
required.
+  // Used both for the test name run${type}Java${runner}
+  // and also for the script name, ${type}-java-${runner}.toLowerCase().
+  String type
+
+  // runner [Direct, Dataflow, Spark, Flink, FlinkLocal, Apex]
+  String runner
+
+  // gcpProject sets the gcpProject argument when executing examples.
   String gcpProject
 
-  // gcsBucket sets the gcsProject argument when executing the quickstart.
+  // gcsBucket sets the gcsProject argument when executing examples.
   String gcsBucket
+
+  // bqDataset sets the BigQuery Dataset when executing mobile-gaming examples
+  String bqDataset
+
+  // pubsubTopic sets topics when executing streaming pipelines
+  String pubsubTopic
 }
 
 // Creates a task to run the quickstart for a runner.
 // Releases version and URL, can be overriden for a RC release with
-// ./gradlew :release:runQuickstartJava -Pver=2.3.0 
-Prepourl=https://repository.apache.org/content/repositories/orgapachebeam-1027
-ext.createJavaQuickstartValidationTask = {
-  JavaQuickstartConfiguration config = it as JavaQuickstartConfiguration
-  def taskName = "runQuickstartJava${config.name}"
+// ./gradlew :release:runJavaExamplesValidationTask -Pver=2.3.0 
-Prepourl=https://repository.apache.org/content/repositories/orgapachebeam-1027
+ext.createJavaExamplesArchetypeValidationTask = {
+  JavaExamplesArchetypeValidationConfiguration config = it as 
JavaExamplesArchetypeValidationConfiguration
+  def taskName = "run${config.type}Java${config.runner}"
   println "Generating :${taskName}"
   def releaseVersion = project.findProperty('ver') ?: version
   def releaseRepo = project.findProperty('repourl') ?: 
'https://repository.apache.org/content/repositories/snapshots'
@@ -1007,9 +1016,15 @@ ext.createJavaQuickstartValidationTask = {
   if (config.gcsBucket) {
 argsNeeded.add("--gcsBucket=${config.gcsBucket}")
   }
+  if (config.bqDataset) {
+  argsNeeded.add("--bqDataset=${config.bqDataset}")
+  }
+  if (config.pubsubTopic) {
+  argsNeeded.add("--pubsubTopic=${config.pubsubTopic}")
+  }
   project.evaluationDependsOn(':release')
   task "${taskName}" (dependsOn: ':release:classes', type: JavaExec) {
-main = "quickstart-java-${c

[beam] branch master updated (3cdecd0 -> 7300eb2)

2018-04-18 Thread tgroh
This is an automated email from the ASF dual-hosted git repository.

tgroh pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from 3cdecd0  Merge pull request #5055 from coheigea/equalsIgnoreCase
 add 07ae520  BEAM-3339 Mobile gaming automation for Java nightly snapshot
 new 7300eb2  Merge pull request #4788: Add Mobile gaming automation for 
Java nightly snapshot on core runners

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../job_beam_PostRelease_NightlySnapshot.groovy|   4 +-
 build_rules.gradle |  43 --
 .../examples/complete/game/injector/Injector.java  |   2 +
 release/build.gradle   |   4 +-
 .../src/main/groovy/MobileGamingCommands.groovy| 144 +
 release/src/main/groovy/QuickstartArchetype.groovy |  16 ++-
 release/src/main/groovy/TestScripts.groovy |  77 ---
 .../main/groovy/mobilegaming-java-dataflow.groovy  | 111 
 .../main/groovy/mobilegaming-java-direct.groovy| 105 +++
 .../src/main/groovy/quickstart-java-apex.groovy|   6 +-
 .../main/groovy/quickstart-java-dataflow.groovy|   6 +-
 .../src/main/groovy/quickstart-java-direct.groovy  |   6 +-
 .../main/groovy/quickstart-java-flinklocal.groovy  |   6 +-
 .../src/main/groovy/quickstart-java-spark.groovy   |   6 +-
 runners/apex/build.gradle  |   2 +-
 runners/direct-java/build.gradle   |  15 ++-
 runners/flink/build.gradle |   2 +-
 runners/google-cloud-dataflow-java/build.gradle|  24 +++-
 runners/spark/build.gradle |   2 +-
 19 files changed, 513 insertions(+), 68 deletions(-)
 create mode 100644 release/src/main/groovy/MobileGamingCommands.groovy
 create mode 100644 release/src/main/groovy/mobilegaming-java-dataflow.groovy
 create mode 100644 release/src/main/groovy/mobilegaming-java-direct.groovy

-- 
To stop receiving notification emails like this one, please contact
tg...@apache.org.


[beam] 01/01: Merge pull request #4788: Add Mobile gaming automation for Java nightly snapshot on core runners

2018-04-18 Thread tgroh
This is an automated email from the ASF dual-hosted git repository.

tgroh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7300eb2bfdd412767e60173c0a0e0d76ba7d067d
Merge: 3cdecd0 07ae520
Author: Thomas Groh 
AuthorDate: Wed Apr 18 18:04:30 2018 -0700

Merge pull request #4788: Add Mobile gaming automation for Java nightly 
snapshot on core runners

[BEAM-3339]

 .../job_beam_PostRelease_NightlySnapshot.groovy|   4 +-
 build_rules.gradle |  43 --
 .../examples/complete/game/injector/Injector.java  |   2 +
 release/build.gradle   |   4 +-
 .../src/main/groovy/MobileGamingCommands.groovy| 144 +
 release/src/main/groovy/QuickstartArchetype.groovy |  16 ++-
 release/src/main/groovy/TestScripts.groovy |  77 ---
 .../main/groovy/mobilegaming-java-dataflow.groovy  | 111 
 .../main/groovy/mobilegaming-java-direct.groovy| 105 +++
 .../src/main/groovy/quickstart-java-apex.groovy|   6 +-
 .../main/groovy/quickstart-java-dataflow.groovy|   6 +-
 .../src/main/groovy/quickstart-java-direct.groovy  |   6 +-
 .../main/groovy/quickstart-java-flinklocal.groovy  |   6 +-
 .../src/main/groovy/quickstart-java-spark.groovy   |   6 +-
 runners/apex/build.gradle  |   2 +-
 runners/direct-java/build.gradle   |  15 ++-
 runners/flink/build.gradle |   2 +-
 runners/google-cloud-dataflow-java/build.gradle|  24 +++-
 runners/spark/build.gradle |   2 +-
 19 files changed, 513 insertions(+), 68 deletions(-)


-- 
To stop receiving notification emails like this one, please contact
tg...@apache.org.


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92320
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:44
Start Date: 19/Apr/18 00:44
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on issue #5150:  [BEAM-4071] Add 
Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#issuecomment-382573160
 
 
   Cleaned up commit history. @tgroh PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92320)
Time Spent: 10h 50m  (was: 10h 40m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 10h 50m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=92319&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92319
 ]

ASF GitHub Bot logged work on BEAM-3327:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:43
Start Date: 19/Apr/18 00:43
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5152: 
[BEAM-3327] Harness Manager Interfaces
URL: https://github.com/apache/beam/pull/5152#discussion_r182608247
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/SdkHarnessManager.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.manager;
+
+import org.apache.beam.model.fnexecution.v1.ProvisionApi;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+
+/**
+ * A manager of resources related to the SDK Harness, capable of providing 
RemoteBundles to runner
+ * operators.
+ *
+ * In order to provide a simple interface to runner operators, the 
SdkHarnessManager is
+ * responsible for owning and managing the lifetimes of resources such as RPC 
servers and remote
+ * environments. It is responsible for both instantiation and cleanup of these 
resources.  Since all
+ * managed resources are owned by the SdkHarnessManager, it is responsible for 
cleaning them up when
+ * its close function is called.
+ */
+public interface SdkHarnessManager extends AutoCloseable {
+  /**
+   * Get a new {@link RemoteBundle bundle} for processing the data in an 
executable stage.
+   *
+   * If necessary, this blocks while provisioning the remote resources 
necessary to support
+   * bundle processing.
+   */
+   RemoteBundle getBundle(
 
 Review comment:
   Ok.  commented on wrong thread initially.  This is the file that contains a 
fix.  Please let me know if this updated interface is closer to what you were 
thinking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92319)
Time Spent: 8h 50m  (was: 8h 40m)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=92317&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92317
 ]

ASF GitHub Bot logged work on BEAM-3327:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:42
Start Date: 19/Apr/18 00:42
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5152: 
[BEAM-3327] Harness Manager Interfaces
URL: https://github.com/apache/beam/pull/5152#discussion_r182607809
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/package-info.java
 ##
 @@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Resource management classes for function execution.
 
 Review comment:
   Okay.  let me know what you think of it now. I've tried to pull out all of 
the constituent parts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92317)
Time Spent: 8.5h  (was: 8h 20m)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=92318&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92318
 ]

ASF GitHub Bot logged work on BEAM-3327:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:42
Start Date: 19/Apr/18 00:42
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5152: 
[BEAM-3327] Harness Manager Interfaces
URL: https://github.com/apache/beam/pull/5152#discussion_r182608247
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/SdkHarnessManager.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.manager;
+
+import org.apache.beam.model.fnexecution.v1.ProvisionApi;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+
+/**
+ * A manager of resources related to the SDK Harness, capable of providing 
RemoteBundles to runner
+ * operators.
+ *
+ * In order to provide a simple interface to runner operators, the 
SdkHarnessManager is
+ * responsible for owning and managing the lifetimes of resources such as RPC 
servers and remote
+ * environments. It is responsible for both instantiation and cleanup of these 
resources.  Since all
+ * managed resources are owned by the SdkHarnessManager, it is responsible for 
cleaning them up when
+ * its close function is called.
+ */
+public interface SdkHarnessManager extends AutoCloseable {
+  /**
+   * Get a new {@link RemoteBundle bundle} for processing the data in an 
executable stage.
+   *
+   * If necessary, this blocks while provisioning the remote resources 
necessary to support
+   * bundle processing.
+   */
+   RemoteBundle getBundle(
 
 Review comment:
   Ok.  commented on wrong thread initially.  This is the file that contains a 
fix.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92318)
Time Spent: 8h 40m  (was: 8.5h)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=92316&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92316
 ]

ASF GitHub Bot logged work on BEAM-3327:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:39
Start Date: 19/Apr/18 00:39
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5152: 
[BEAM-3327] Harness Manager Interfaces
URL: https://github.com/apache/beam/pull/5152#discussion_r182607809
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/package-info.java
 ##
 @@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Resource management classes for function execution.
 
 Review comment:
   Okay.  let me know what you think of it now. I've tried to pull out all of 
the constituent parts.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92316)
Time Spent: 8h 20m  (was: 8h 10m)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PostCommit_Java_GradleBuild #114

2018-04-18 Thread Apache Jenkins Server
See 


Changes:

[coheigea] Change equals() + toUpper/LowerCase to equalsIgnoreCase

--
[...truncated 19.10 MB...]
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Sample 
keys/GroupByKey as step s16
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Sample 
keys/Combine.GroupedValues as step s17
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/GBKaSVForData/ParDo(GroupByKeyHashAndSortByKeyAndWindow) as step s18
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/GBKaSVForData/BatchViewOverrides.GroupByKeyAndSortValuesOnly as step s19
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/ParMultiDo(ToIsmRecordForMapLike) as step s20
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/GBKaSVForSize as step s21
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/ParDo(ToIsmMetadataRecordForSize) as step s22
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/GBKaSVForKeys as step s23
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/ParDo(ToIsmMetadataRecordForKey) as step s24
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/Flatten.PCollections as step s25
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Keys sample 
as view/CreateDataflowView as step s26
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Partition 
input as step s27
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Group by 
partition as step s28
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Batch 
mutations together as step s29
Apr 19, 2018 12:29:39 AM 
org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator addStep
INFO: Adding SpannerIO.Write/Write mutations to Cloud Spanner/Write 
mutations to Spanner as step s30
Apr 19, 2018 12:29:39 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Staging pipeline description to 
gs://temp-storage-for-end-to-end-tests/spannerwriteit0testwrite-jenkins-0419002924-49ff12df/output/results/staging/
Apr 19, 2018 12:29:39 AM org.apache.beam.runners.dataflow.util.PackageUtil 
tryStagePackage
INFO: Uploading <80362 bytes, hash oRmlGOR6Nc428tE8IMABjA> to 
gs://temp-storage-for-end-to-end-tests/spannerwriteit0testwrite-jenkins-0419002924-49ff12df/output/results/staging/pipeline-oRmlGOR6Nc428tE8IMABjA.pb

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testWrite STANDARD_OUT
Dataflow SDK version: 2.5.0-SNAPSHOT

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testWrite STANDARD_ERROR
Apr 19, 2018 12:29:42 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To access the Dataflow monitoring console, please navigate to 
https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-04-18_17_29_40-423058059425268355?project=apache-beam-testing

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testWrite STANDARD_OUT
Submitted job: 2018-04-18_17_29_40-423058059425268355

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testWrite STANDARD_ERROR
Apr 19, 2018 12:29:42 AM org.apache.beam.runners.d

[jira] [Created] (BEAM-4134) Fix : Potential process leak

2018-04-18 Thread Reza ardeshir rokni (JIRA)
Reza ardeshir rokni created BEAM-4134:
-

 Summary: Fix : Potential process leak 
 Key: BEAM-4134
 URL: https://issues.apache.org/jira/browse/BEAM-4134
 Project: Beam
  Issue Type: Bug
  Components: examples-java
Affects Versions: 2.5.0
Reporter: Reza ardeshir rokni
Assignee: Reuven Lax


Need to check for resource leak as reported by [~reuvenlax]

[https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java#L154]

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (BEAM-4132) Element type inference doesn't work for multi-output DoFns

2018-04-18 Thread Chuan Yu Foo (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chuan Yu Foo updated BEAM-4132:
---
Description: 
TLDR: if you have a multi-output DoFn, then the non-main PCollections with 
incorrectly have their element types set to None. This affects type checking 
for pipelines involving these PCollections.

Minimal example:
{code}
import apache_beam as beam

class TripleDoFn(beam.DoFn):
  def process(self, elem):
yield_elem
if elem % 2 == 0:
  yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
if elem % 3 == 0:
  yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
  
@beam.typehints.with_input_types(int)
@beam.typehints.with_output_types(int)
class MultiplyBy(beam.DoFn):
  def __init__(self, multiplier):
self._multiplier = multiplier

  def process(self, elem):
return elem * self._multiplier
  
def main():
  with beam.Pipeline() as p:
x, a, b = (
  p
  | 'Create' >> beam.Create([1, 2, 3])
  | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
'ten_times', 'hundred_times', main='main_output'))

_ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))

if __name__ == '__main__':
  main()
{code}
Running this yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
'MultiplyBy2': requires  but got None for elem
{noformat}
Replacing {{a}} with {{b}} yields the same error. Replacing {{a}} with {{x}} 
instead yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
'MultiplyBy2': requires  but got Union[TaggedOutput, int] for elem
{noformat}
I would expect Beam to correctly infer that {{a}} and {{b}} have element types 
of {{int}} rather than {{None}}, and I would also expect Beam to correctly 
figure out that the element types of {{x}} are compatible with {{int}}.

  was:
TLDR: if you have a multi-output DoFn, then the non-main PCollections with 
incorrectly have their element types set to None. This affects type checking 
for pipelines involving these PCollections.

Minimal example:
{code:python}
import apache_beam as beam

class TripleDoFn(beam.DoFn):
  def process(self, elem):
yield_elem
if elem % 2 == 0:
  yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
if elem % 3 == 0:
  yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
  
@beam.typehints.with_input_types(int)
@beam.typehints.with_output_types(int)
class MultiplyBy(beam.DoFn):
  def __init__(self, multiplier):
self._multiplier = multiplier

  def process(self, elem):
return elem * self._multiplier
  
def main():
  with beam.Pipeline() as p:
x, a, b = (
  p
  | 'Create' >> beam.Create([1, 2, 3])
  | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
'ten_times', 'hundred_times', main='main_output'))

_ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))

if __name__ == '__main__':
  main()
{code}

Running this yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
'MultiplyBy2': requires  but got None for elem
{noformat}

Replacing {{a}} with {{b}} as follows yields the same error. Replacing {{a}} 
with {{x}} instead yields the following error:

{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
'MultiplyBy2': requires  but got Union[TaggedOutput, int] for elem
{noformat}

I would expect Beam to correctly infer that {{a}} and {{b}} have element types 
of {{int}} rather than {{None}}, and I would also expect Beam to correctly 
figure out that the element types of  {{x}} are compatible with {{int}}.


> Element type inference doesn't work for multi-output DoFns
> --
>
> Key: BEAM-4132
> URL: https://issues.apache.org/jira/browse/BEAM-4132
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.4.0
>Reporter: Chuan Yu Foo
>Assignee: Ahmet Altay
>Priority: Major
>
> TLDR: if you have a multi-output DoFn, then the non-main PCollections with 
> incorrectly have their element types set to None. This affects type checking 
> for pipelines involving these PCollections.
> Minimal example:
> {code}
> import apache_beam as beam
> class TripleDoFn(beam.DoFn):
>   def process(self, elem):
> yield_elem
> if elem % 2 == 0:
>   yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
> if elem % 3 == 0:
>   yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
>   
> @beam.typehints.with_input_types(int)
> @beam.typehints.with_output_types(int)
> class MultiplyBy(beam.DoFn):
>   def __init__(self, multiplier):
> self._multiplier = multiplier
>   def process(self, elem):
> return elem * self._multiplier
>   
> def

[jira] [Work logged] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3268?focusedWorklogId=92313&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92313
 ]

ASF GitHub Bot logged work on BEAM-3268:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:20
Start Date: 19/Apr/18 00:20
Worklog Time Spent: 10m 
  Work Description: jkff commented on issue #5159: [BEAM-3268] Reshuffle 
filenames before returning them from WriteFilesResult
URL: https://github.com/apache/beam/pull/5159#issuecomment-382569310
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92313)
Time Spent: 1.5h  (was: 1h 20m)

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Eugene Kirpichov
>Priority: Major
> Attachments: comparison.png
>
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3268) getPerDestinationOutputFilenames() is getting processed before write is finished on dataflow runner

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3268?focusedWorklogId=92312&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92312
 ]

ASF GitHub Bot logged work on BEAM-3268:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:18
Start Date: 19/Apr/18 00:18
Worklog Time Spent: 10m 
  Work Description: jkff commented on issue #5159: [BEAM-3268] Reshuffle 
filenames before returning them from WriteFilesResult
URL: https://github.com/apache/beam/pull/5159#issuecomment-382569310
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92312)
Time Spent: 1h 20m  (was: 1h 10m)

> getPerDestinationOutputFilenames() is getting processed before write is 
> finished on dataflow runner
> ---
>
> Key: BEAM-3268
> URL: https://issues.apache.org/jira/browse/BEAM-3268
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.3.0
>Reporter: Kamil Szewczyk
>Assignee: Eugene Kirpichov
>Priority: Major
> Attachments: comparison.png
>
>  Time Spent: 1h 20m
>  Remaining Estimate: 0h
>
> While running filebased-io-test we found dataflow-runnner misbehaving. We run 
> tests using single pipeline and without using Reshuffling between writing and 
> reading dataflow jobs are unsuccessful because the runner tries to access the 
> files that were not created yet. 
> On the picture the difference between execution of writting is presented. On 
> the left there is working example with Reshuffling added and on the right 
> without it.
> !comparison.png|thumbnail!
> Steps to reproduce: substitute your-bucket-name wit your valid bucket.
> {code:java}
> mvn -e -Pio-it verify -pl sdks/java/io/file-based-io-tests 
> -DintegrationTestPipelineOptions='["--runner=dataflow", 
> "--filenamePrefix=gs://your-bucket-name/TEXTIO_IT"]' -Pdataflow-runner
> {code}
> Then look on the cloud console and job should fail.
> Now add Reshuffling to 
> sdks/java/io/file-based-io-tests/src/test/java/org/apache/beam/sdk/io/text/TextIOIT.java
>  as in the example.
> {code:java}
> .getPerDestinationOutputFilenames().apply(Values.create())
> .apply(Reshuffle.viaRandomKey());
> PCollection consolidatedHashcode = testFilenames
> {code}
> and trigger previously used maven command to see it working in the console 
> right now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4018) Add a ByteKeyRangeTracker based on RestrictionTracker for SDF

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4018?focusedWorklogId=92311&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92311
 ]

ASF GitHub Bot logged work on BEAM-4018:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:17
Start Date: 19/Apr/18 00:17
Worklog Time Spent: 10m 
  Work Description: jkff commented on issue #5175: [BEAM-4018] Add a 
ByteKeyRangeTracker based on RestrictionTracker for SDF
URL: https://github.com/apache/beam/pull/5175#issuecomment-382569035
 
 
   @chamikaramj Could you do the first round?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92311)
Time Spent: 0.5h  (was: 20m)

> Add a ByteKeyRangeTracker based on RestrictionTracker for SDF
> -
>
> Key: BEAM-4018
> URL: https://issues.apache.org/jira/browse/BEAM-4018
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Ismaël Mejía
>Assignee: Ismaël Mejía
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> We can have a RestrictionTracker for ByteKey ranges as part of the core sdk 
> so it can be reused by future SDF based IOs like Bigtable, HBase among others.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4133) Properly handle HTTP 308 errors from GCS

2018-04-18 Thread Chamikara Jayalath (JIRA)
Chamikara Jayalath created BEAM-4133:


 Summary: Properly handle HTTP 308 errors from GCS
 Key: BEAM-4133
 URL: https://issues.apache.org/jira/browse/BEAM-4133
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Reporter: Chamikara Jayalath


We currently retry these errors, but we could do better by partially retrying 
as suggested by GCS spec: 
[https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload]

We can hit this error in various places where we connect to GCS (for example, 
file-based source/sink, jar staging for DataflowRunner).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=92309&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92309
 ]

ASF GitHub Bot logged work on BEAM-3327:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:08
Start Date: 19/Apr/18 00:08
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5152: 
[BEAM-3327] Harness Manager Interfaces
URL: https://github.com/apache/beam/pull/5152#discussion_r182603824
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/package-info.java
 ##
 @@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Resource management classes for function execution.
 
 Review comment:
   While the sdk harness manager will only expose a `RemoteBundle` to the 
operator, it is actually responsible for setting up and shutting down 
`RemoteEnvironment` and all `GrpcFnServer` instances according to their 
respective lifetimes. Because these classes span the other local packages, it 
made sense to me to start a new package for manager classes that aggregate 
"resources" from those various domains.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92309)
Time Spent: 8h  (was: 7h 50m)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3327) Add abstractions to manage Environment Instance lifecycles.

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3327?focusedWorklogId=92310&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92310
 ]

ASF GitHub Bot logged work on BEAM-3327:


Author: ASF GitHub Bot
Created on: 19/Apr/18 00:08
Start Date: 19/Apr/18 00:08
Worklog Time Spent: 10m 
  Work Description: axelmagn commented on a change in pull request #5152: 
[BEAM-3327] Harness Manager Interfaces
URL: https://github.com/apache/beam/pull/5152#discussion_r182603926
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/manager/SdkHarnessManager.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.manager;
+
+import org.apache.beam.model.fnexecution.v1.ProvisionApi;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
+import org.apache.beam.runners.fnexecution.control.RemoteBundle;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+
+/**
+ * A manager of resources related to the SDK Harness, capable of providing 
RemoteBundles to runner
+ * operators.
+ *
+ * In order to provide a simple interface to runner operators, the 
SdkHarnessManager is
+ * responsible for owning and managing the lifetimes of resources such as RPC 
servers and remote
+ * environments. It is responsible for both instantiation and cleanup of these 
resources.  Since all
+ * managed resources are owned by the SdkHarnessManager, it is responsible for 
cleaning them up when
+ * its close function is called.
+ */
+public interface SdkHarnessManager extends AutoCloseable {
+  /**
+   * Get a new {@link RemoteBundle bundle} for processing the data in an 
executable stage.
+   *
+   * If necessary, this blocks while provisioning the remote resources 
necessary to support
+   * bundle processing.
+   */
+   RemoteBundle getBundle(
 
 Review comment:
   That's a good point.  I'll decouple the registration of operator resources 
from bundle retrieval.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92310)
Time Spent: 8h 10m  (was: 8h)

> Add abstractions to manage Environment Instance lifecycles.
> ---
>
> Key: BEAM-3327
> URL: https://issues.apache.org/jira/browse/BEAM-3327
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Thomas Groh
>Assignee: Axel Magnuson
>Priority: Major
>  Labels: portability
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> This permits remote stage execution for arbitrary environments



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4132) Element type inference doesn't work for multi-output DoFns

2018-04-18 Thread Chuan Yu Foo (JIRA)
Chuan Yu Foo created BEAM-4132:
--

 Summary: Element type inference doesn't work for multi-output DoFns
 Key: BEAM-4132
 URL: https://issues.apache.org/jira/browse/BEAM-4132
 Project: Beam
  Issue Type: Bug
  Components: sdk-py-core
Affects Versions: 2.4.0
Reporter: Chuan Yu Foo
Assignee: Ahmet Altay


TLDR: if you have a multi-output DoFn, then the non-main PCollections with 
incorrectly have their element types set to None. This affects type checking 
for pipelines involving these PCollections.

Minimal example:
{code:python}
import apache_beam as beam

class TripleDoFn(beam.DoFn):
  def process(self, elem):
yield_elem
if elem % 2 == 0:
  yield beam.pvalue.TaggedOutput('ten_times', elem * 10)
if elem % 3 == 0:
  yield beam.pvalue.TaggedOutput('hundred_times', elem * 100)
  
@beam.typehints.with_input_types(int)
@beam.typehints.with_output_types(int)
class MultiplyBy(beam.DoFn):
  def __init__(self, multiplier):
self._multiplier = multiplier

  def process(self, elem):
return elem * self._multiplier
  
def main():
  with beam.Pipeline() as p:
x, a, b = (
  p
  | 'Create' >> beam.Create([1, 2, 3])
  | 'TripleDo' >> beam.ParDo(TripleDoFn()).with_outputs(
'ten_times', 'hundred_times', main='main_output'))

_ = a | 'MultiplyBy2' >> beam.ParDo(MultiplyBy(2))

if __name__ == '__main__':
  main()
{code}

Running this yields the following error:
{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
'MultiplyBy2': requires  but got None for elem
{noformat}

Replacing {{a}} with {{b}} as follows yields the same error. Replacing {{a}} 
with {{x}} instead yields the following error:

{noformat}
apache_beam.typehints.decorators.TypeCheckError: Type hint violation for 
'MultiplyBy2': requires  but got Union[TaggedOutput, int] for elem
{noformat}

I would expect Beam to correctly infer that {{a}} and {{b}} have element types 
of {{int}} rather than {{None}}, and I would also expect Beam to correctly 
figure out that the element types of  {{x}} are compatible with {{int}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4093) Support Python ValidatesRunner test against TestDataflowRunner in streaming

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4093?focusedWorklogId=92308&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92308
 ]

ASF GitHub Bot logged work on BEAM-4093:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:58
Start Date: 18/Apr/18 23:58
Worklog Time Spent: 10m 
  Work Description: markflyhigh commented on issue #5147: [BEAM-4093] 
Support Python ValidatesRunner test in streaming
URL: https://github.com/apache/beam/pull/5147#issuecomment-382565690
 
 
   I don't think they are related but I didn't see those failure in PostCommit. 
I'll rebase the branch and run test again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92308)
Time Spent: 1h 10m  (was: 1h)

> Support Python ValidatesRunner test against TestDataflowRunner in streaming
> ---
>
> Key: BEAM-4093
> URL: https://issues.apache.org/jira/browse/BEAM-4093
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core, testing
>Reporter: Mark Liu
>Assignee: Mark Liu
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92303&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92303
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:53
Start Date: 18/Apr/18 23:53
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182601780
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/CloseableResource.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link AutoCloseable} that wraps a resource that needs to be cleaned up 
but does not implement
+ * {@link AutoCloseable} itself.
+ *
+ * Recipients of a {@link CloseableResource} are in general responsible for 
cleanup. Ownership
+ * can be transferred from one context to another via {@link #transfer()}. 
Transferring relinquishes
+ * ownership from the original resource. This allows resources to be safely 
constructed and
+ * transferred within a try-with-resources block. For example:
+ *
+ * try (CloseableResource resource = 
CloseableResource.of(...)) {
 
 Review comment:
   
   
   > **jkff** wrote:
   > {@code
   > ...
   > }
   > could help you avoid the ampersands.
   
   
   I didn't realize `@code` worked with multiline text. Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92303)
Time Spent: 10h 20m  (was: 10h 10m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92305&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92305
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:53
Start Date: 18/Apr/18 23:53
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182601783
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/testing/InMemoryArtifactService.java
 ##
 @@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.BaseEncoding;
+import com.google.protobuf.ByteString;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import java.security.MessageDigest;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO: Implement artifact retrieval.
+/** A StagingService for tests. */
+public class InMemoryArtifactService extends ArtifactStagingServiceImplBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(InMemoryArtifactService.class);
+
+  private final Object artifactLock = new Object();
+
+  @GuardedBy("artifactLock")
+  private final Map artifacts = 
Maps.newHashMap();
+
+  private final boolean keepArtifacts;
+
+  @GuardedBy("artifactLock")
+  private boolean committed = false;
+
+  /**
+   * Constructs an {@link InMemoryArtifactService}. If {@code keepArtifacts} 
is true, all artifacts
+   * are kept in memory as {@link ByteString ByteStrings}. Doing so is 
currently a waste of space
 
 Review comment:
   
   
   > **jkff** wrote:
   > Hm then can you just remove this parameter? If setting it to true 
currently accomplishes no useful purpose (it's fine to keep if you plan to very 
soon follow up this PR with something that makes it have a useful purpose)
   
   
   I've thought about it a bit and realized that we do not want to actually 
serve artifacts here. That functionality is better handled by a ValidatesRunner 
integration test. Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92305)
Time Spent: 10.5h  (was: 10h 20m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92304
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:53
Start Date: 18/Apr/18 23:53
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182601778
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/CloseableResource.java
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
+
+/**
+ * An {@link AutoCloseable} that wraps a resource that needs to be cleaned up 
but does not implement
+ * {@link AutoCloseable} itself.
+ *
+ * Recipients of a {@link CloseableResource} are in general responsible for 
cleanup. Ownership
+ * can be transferred from one context to another via {@link #transfer()}. 
Transferring relinquishes
+ * ownership from the original resource. This allows resources to be safely 
constructed and
+ * transferred within a try-with-resources block. For example:
+ *
+ * try (CloseableResource resource = 
CloseableResource.of(...)) {
+ *   // Do something with resource.
+ *   ...
+ *   // Then transfer ownership to some consumer.
+ *   resourceConsumer(resource.release());
 
 Review comment:
   
   
   > **jkff** wrote:
   > Did you mean .transfer()?
   
   
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92304)
Time Spent: 10h 20m  (was: 10h 10m)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New Feature
>  Components: runner-core
>Reporter: Ben Sidhom
>Assignee: Ben Sidhom
>Priority: Minor
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> There needs to be a way to execute Java-SDK pipelines against a portable job 
> server. The job server itself is expected to be started up out-of-band. The 
> "PortableRunner" should take an option indicating the Job API endpoint and 
> defer other runner configurations to the backend itself.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92307&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92307
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:53
Start Date: 18/Apr/18 23:53
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182601789
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.FileToStage;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
 
 Review comment:
   
   
   > **jkff** wrote:
   > Hm okay. I looked at existing PipelineRunner implementations and was 
unable to figure out whether they do or don't intend to support running 
multiple pipelines with the same PipelineRunner. We can deal with this later if 
necessary.
   
   
   I verified with @tgroh that they do not. Unfortunately, `PipelineOptions` 
are effectively global flags, so users can't meaningfully instantiate different 
options anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92307)
Time Spent: 10h 40m  (was: 10.5h)

> Portable Runner Job API shim
> 
>
> Key: BEAM-4071
> URL: https://issues.apache.org/jira/browse/BEAM-4071
> Project: Beam
>  Issue Type: New

[jira] [Work logged] (BEAM-4071) Portable Runner Job API shim

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-4071?focusedWorklogId=92306&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92306
 ]

ASF GitHub Bot logged work on BEAM-4071:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:53
Start Date: 18/Apr/18 23:53
Worklog Time Spent: 10m 
  Work Description: bsidhom commented on a change in pull request #5150:  
[BEAM-4071] Add Portable Runner Job API shim
URL: https://github.com/apache/beam/pull/5150#discussion_r182601788
 
 

 ##
 File path: 
runners/reference/java/src/main/java/org/apache/beam/runners/reference/PortableRunner.java
 ##
 @@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.reference;
+
+import static com.google.common.base.Preconditions.checkState;
+import static 
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import io.grpc.ManagedChannel;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
+import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
+import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc;
+import 
org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
+import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.construction.ArtifactServiceStager;
+import 
org.apache.beam.runners.core.construction.ArtifactServiceStager.FileToStage;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.PipelineRunner;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.PortablePipelineOptions;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link PipelineRunner} a {@link Pipeline} against a {@code JobService}. 
*/
+public class PortableRunner extends PipelineRunner {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PortableRunner.class);
+
+  /** Provided pipeline options. */
+  private final PipelineOptions options;
+  /** Job API endpoint. */
+  private final String endpoint;
+  /** Files to stage to artifact staging service. They will ultimately be 
added to the classpath. */
+  private final Collection filesToStage;
+  /** Channel factory used to create communication channel with job and 
staging services. */
+  private final ManagedChannelFactory channelFactory;
+
+  /**
+   * Constructs a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static PortableRunner fromOptions(PipelineOptions options) {
+if (true) {
+  throw new UnsupportedOperationException();
+}
+return createInternal(options, new DirectoryZipper(), 
getChannelFactory(options));
+  }
+
+  @VisibleForTesting
+  static PortableRunner createInternal(
+  PipelineOptions options, DirectoryZipper zipper, ManagedChannelFactory 
channelFactory) {
+PortablePipelineOptions portableOptions =
+PipelineOptionsValidator.validate(PortablePipelineOptions.class, 
options);
+
+String end

[jira] [Work logged] (BEAM-3746) Count.globally should override getIncompatibleGlobalWindowErrorMessage to tell the user the usage that is currently only in javadoc

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3746?focusedWorklogId=92300&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92300
 ]

ASF GitHub Bot logged work on BEAM-3746:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:38
Start Date: 18/Apr/18 23:38
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on issue #5176: [BEAM-3746] Added 
error message for using Count.globally() on non-global windows
URL: https://github.com/apache/beam/pull/5176#issuecomment-382562432
 
 
   @XuMingmin 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92300)
Time Spent: 0.5h  (was: 20m)

> Count.globally should override getIncompatibleGlobalWindowErrorMessage to 
> tell the user the usage that is currently only in javadoc
> ---
>
> Key: BEAM-3746
> URL: https://issues.apache.org/jira/browse/BEAM-3746
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Batkhuyag Batsaikhan
>Priority: Major
>  Labels: beginner, newbie, starter
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/transforms/Count.html#globally--
> "Note: if the input collection uses a windowing strategy other than 
> GlobalWindows, use Combine.globally(Count.combineFn()).withoutDefaults() 
> instead."
> But the actual crash a user gets is:
> "java.lang.IllegalStateException: Default values are not supported in 
> Combine.globally() if the output PCollection is not windowed by 
> GlobalWindows. Instead, use Combine.globally().withoutDefaults() to output an 
> empty PCollection if the input PCollection is empty, or 
> Combine.globally().asSingletonView() to get the default output of the 
> CombineFn if the input PCollection is empty."
> There is a method that exists solely to make this actually useful, so we 
> should use it!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3746) Count.globally should override getIncompatibleGlobalWindowErrorMessage to tell the user the usage that is currently only in javadoc

2018-04-18 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3746?focusedWorklogId=92299&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-92299
 ]

ASF GitHub Bot logged work on BEAM-3746:


Author: ASF GitHub Bot
Created on: 18/Apr/18 23:37
Start Date: 18/Apr/18 23:37
Worklog Time Spent: 10m 
  Work Description: huygaa11 commented on issue #5176: [BEAM-3746] Added 
error message for using Count.globally() on non-global windows
URL: https://github.com/apache/beam/pull/5176#issuecomment-382562355
 
 
   @kennknowles Could you merge it in? Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 92299)
Time Spent: 20m  (was: 10m)

> Count.globally should override getIncompatibleGlobalWindowErrorMessage to 
> tell the user the usage that is currently only in javadoc
> ---
>
> Key: BEAM-3746
> URL: https://issues.apache.org/jira/browse/BEAM-3746
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Batkhuyag Batsaikhan
>Priority: Major
>  Labels: beginner, newbie, starter
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/transforms/Count.html#globally--
> "Note: if the input collection uses a windowing strategy other than 
> GlobalWindows, use Combine.globally(Count.combineFn()).withoutDefaults() 
> instead."
> But the actual crash a user gets is:
> "java.lang.IllegalStateException: Default values are not supported in 
> Combine.globally() if the output PCollection is not windowed by 
> GlobalWindows. Instead, use Combine.globally().withoutDefaults() to output an 
> empty PCollection if the input PCollection is empty, or 
> Combine.globally().asSingletonView() to get the default output of the 
> CombineFn if the input PCollection is empty."
> There is a method that exists solely to make this actually useful, so we 
> should use it!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >