[ 
https://issues.apache.org/jira/browse/BEAM-8372?focusedWorklogId=327108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327108
 ]

ASF GitHub Bot logged work on BEAM-8372:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Oct/19 22:44
            Start Date: 11/Oct/19 22:44
    Worklog Time Spent: 10m 
      Work Description: ibzib commented on pull request #9775: [BEAM-8372] Job 
server submitting UberJars directly to Flink Runner.
URL: https://github.com/apache/beam/pull/9775#discussion_r334194808
 
 

 ##########
 File path: sdks/python/apache_beam/runners/portability/artifact_service.py
 ##########
 @@ -77,47 +87,120 @@ def PutArtifact(self, request_iterator, context=None):
         metadata = request.metadata.metadata
         retrieval_token = self.retrieval_token(
             request.metadata.staging_session_token)
-        self._mkdir(retrieval_token)
-        temp_path = filesystems.FileSystems.join(
-            self._root,
-            retrieval_token,
-            '%x.tmp' % random.getrandbits(128))
-        fout = filesystems.FileSystems.create(temp_path)
+        artifact_path = self._artifact_path(retrieval_token, metadata.name)
+        temp_path = self._temp_path(artifact_path)
+        fout = self._open(temp_path, 'w')
         hasher = hashlib.sha256()
       else:
         hasher.update(request.data.data)
         fout.write(request.data.data)
     fout.close()
     data_hash = hasher.hexdigest()
     if metadata.sha256 and metadata.sha256 != data_hash:
-      filesystems.FileSystems.delete([temp_path])
+      self._delete(temp_path)
       raise ValueError('Bad metadata hash: %s vs %s' % (
-          metadata.metadata.sha256, data_hash))
-    filesystems.FileSystems.rename(
-        [temp_path], [self._artifact_path(retrieval_token, metadata.name)])
+          metadata.sha256, data_hash))
+    self._rename(temp_path, artifact_path)
     return beam_artifact_api_pb2.PutArtifactResponse()
 
   def CommitManifest(self, request, context=None):
     retrieval_token = self.retrieval_token(request.staging_session_token)
-    with filesystems.FileSystems.create(
-        self._manifest_path(retrieval_token)) as fout:
-      fout.write(request.manifest.SerializeToString())
+    proxy_manifest = beam_artifact_api_pb2.ProxyManifest(
+        manifest=request.manifest,
+        location=[
+            beam_artifact_api_pb2.ProxyManifest.Location(
+                name=metadata.name,
+                uri=self._artifact_path(retrieval_token, metadata.name))
+            for metadata in request.manifest.artifact])
+    with self._open(self._manifest_path(retrieval_token), 'w') as fout:
+      fout.write(json_format.MessageToJson(proxy_manifest).encode('utf-8'))
     return beam_artifact_api_pb2.CommitManifestResponse(
         retrieval_token=retrieval_token)
 
   def GetManifest(self, request, context=None):
-    with filesystems.FileSystems.open(
-        self._manifest_path(request.retrieval_token)) as fin:
-      return beam_artifact_api_pb2.GetManifestResponse(
-          manifest=beam_artifact_api_pb2.Manifest.FromString(
-              fin.read()))
+    return beam_artifact_api_pb2.GetManifestResponse(
+        manifest=self._get_manifest_proxy(request.retrieval_token).manifest)
 
   def GetArtifact(self, request, context=None):
-    with filesystems.FileSystems.open(
-        self._artifact_path(request.retrieval_token, request.name)) as fin:
-      # This value is not emitted, but lets us yield a single empty
-      # chunk on an empty file.
-      chunk = True
-      while chunk:
-        chunk = fin.read(self._chunk_size)
-        yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+    for artifact in self._get_manifest_proxy(request.retrieval_token).location:
+      if artifact.name == request.name:
+        with self._open(artifact.uri, 'r') as fin:
+          # This value is not emitted, but lets us yield a single empty
+          # chunk on an empty file.
+          chunk = True
+          while chunk:
+            chunk = fin.read(self._chunk_size)
+            yield beam_artifact_api_pb2.ArtifactChunk(data=chunk)
+        break
+    else:
+      raise ValueError('Unknown artifact: %s' % request.name)
+
+
+class ZipFileArtifactService(AbstractArtifactService):
 
 Review comment:
   Can you add a docstring?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 327108)
    Time Spent: 50m  (was: 40m)

> Allow submission of Flink UberJar directly to flink cluster.
> ------------------------------------------------------------
>
>                 Key: BEAM-8372
>                 URL: https://issues.apache.org/jira/browse/BEAM-8372
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Robert Bradshaw
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to