This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch release-2.17.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push: new 4aed759 [BEAM-8815] Skip manifest when no artifacts are staged new 9171e0f Merge pull request #10213 from tweise/release-2.17.0 4aed759 is described below commit 4aed759cda92a14528a68ffacd9dfc1dbe4c340f Author: Thomas Weise <t...@apache.org> AuthorDate: Sat Nov 23 21:22:51 2019 -0800 [BEAM-8815] Skip manifest when no artifacts are staged --- .../artifact/AbstractArtifactRetrievalService.java | 20 ++++++----- .../artifact/AbstractArtifactStagingService.java | 40 +++++++++++++--------- .../BeamFileSystemArtifactServicesTest.java | 18 ++++++++++ 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java index 93ae657..72af9e8 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactRetrievalService.java @@ -95,15 +95,19 @@ public abstract class AbstractArtifactRetrievalService LOG.info("GetManifest for {}", token); try { - ArtifactApi.ProxyManifest proxyManifest = getManifestProxy(token); + final ArtifactApi.Manifest manifest; + if (AbstractArtifactStagingService.NO_ARTIFACTS_STAGED_TOKEN.equals(token)) { + manifest = ArtifactApi.Manifest.newBuilder().build(); + } else { + ArtifactApi.ProxyManifest proxyManifest = getManifestProxy(token); + LOG.info( + "GetManifest for {} -> {} artifacts", + token, + proxyManifest.getManifest().getArtifactCount()); + manifest = proxyManifest.getManifest(); + } ArtifactApi.GetManifestResponse response = - ArtifactApi.GetManifestResponse.newBuilder() - .setManifest(proxyManifest.getManifest()) - .build(); - LOG.info( - "GetManifest for {} -> {} artifacts", - token, - proxyManifest.getManifest().getArtifactCount()); + ArtifactApi.GetManifestResponse.newBuilder().setManifest(manifest).build(); responseObserver.onNext(response); responseObserver.onCompleted(); } catch (Exception e) { diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java index 25f09a3..86e79a5 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/AbstractArtifactStagingService.java @@ -50,6 +50,8 @@ import org.slf4j.LoggerFactory; public abstract class AbstractArtifactStagingService extends ArtifactStagingServiceImplBase implements FnService { + public static final String NO_ARTIFACTS_STAGED_TOKEN = "__no_artifacts_staged__"; + private static final Logger LOG = LoggerFactory.getLogger(AbstractArtifactStagingService.class); private static final Charset CHARSET = StandardCharsets.UTF_8; @@ -77,25 +79,29 @@ public abstract class AbstractArtifactStagingService extends ArtifactStagingServ public void commitManifest( CommitManifestRequest request, StreamObserver<CommitManifestResponse> responseObserver) { try { - String stagingSessionToken = request.getStagingSessionToken(); - ProxyManifest.Builder proxyManifestBuilder = - ProxyManifest.newBuilder().setManifest(request.getManifest()); - for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) { - proxyManifestBuilder.addLocation( - Location.newBuilder() - .setName(artifactMetadata.getName()) - .setUri(getArtifactUri(stagingSessionToken, encodedFileName(artifactMetadata))) - .build()); - } - try (WritableByteChannel manifestWritableByteChannel = openManifest(stagingSessionToken)) { - manifestWritableByteChannel.write( - CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build()))); + final String retrievalToken; + if (request.getManifest().getArtifactCount() > 0) { + String stagingSessionToken = request.getStagingSessionToken(); + ProxyManifest.Builder proxyManifestBuilder = + ProxyManifest.newBuilder().setManifest(request.getManifest()); + for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) { + proxyManifestBuilder.addLocation( + Location.newBuilder() + .setName(artifactMetadata.getName()) + .setUri(getArtifactUri(stagingSessionToken, encodedFileName(artifactMetadata))) + .build()); + } + try (WritableByteChannel manifestWritableByteChannel = openManifest(stagingSessionToken)) { + manifestWritableByteChannel.write( + CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build()))); + } + retrievalToken = getRetrievalToken(stagingSessionToken); + // TODO: Validate integrity of staged files. + } else { + retrievalToken = NO_ARTIFACTS_STAGED_TOKEN; } - // TODO: Validate integrity of staged files. responseObserver.onNext( - CommitManifestResponse.newBuilder() - .setRetrievalToken(getRetrievalToken(stagingSessionToken)) - .build()); + CommitManifestResponse.newBuilder().setRetrievalToken(retrievalToken).build()); responseObserver.onCompleted(); } catch (Exception e) { // TODO: Cleanup all the artifacts. diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java index 40807ca..9585530 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactServicesTest.java @@ -206,6 +206,24 @@ public class BeamFileSystemArtifactServicesTest { } @Test + public void noArtifactsTest() throws Exception { + String stagingSession = "123"; + String stagingSessionToken = + BeamFileSystemArtifactStagingService.generateStagingSessionToken( + stagingSession, stagingDir.toUri().getPath()); + String stagingToken = commitManifest(stagingSessionToken, Collections.emptyList()); + Assert.assertEquals(AbstractArtifactStagingService.NO_ARTIFACTS_STAGED_TOKEN, stagingToken); + Assert.assertFalse( + Files.exists(Paths.get(stagingDir.toAbsolutePath().toString(), stagingSession))); + + GetManifestResponse retrievedManifest = + retrievalBlockingStub.getManifest( + GetManifestRequest.newBuilder().setRetrievalToken(stagingToken).build()); + Assert.assertEquals( + "Manifest with 0 artifacts", 0, retrievedManifest.getManifest().getArtifactCount()); + } + + @Test public void putArtifactsSingleSmallFileTest() throws Exception { String fileName = "file1"; String stagingSession = "123";