Repository: beam Updated Branches: refs/heads/master 47821ad69 -> f29444bf8
[BEAM-1871] Hide internal implementation details of how we create a DefaultBucket for GCP Temp Location Moved relevant contents of GcpProjectUtil and DefaultProject into GcpOptions.GcpTempLocation Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1b35f46 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1b35f46 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1b35f46 Branch: refs/heads/master Commit: c1b35f46ddd321b29132606d3633d45ff134ff6c Parents: 47821ad Author: Luke Cwik <lc...@google.com> Authored: Thu Apr 27 13:50:00 2017 -0700 Committer: Lukasz Cwik <lc...@google.com> Committed: Sat Apr 29 09:06:12 2017 -0700 ---------------------------------------------------------------------- .../options/CloudResourceManagerOptions.java | 14 - .../sdk/extensions/gcp/options/GcpOptions.java | 124 ++++++- .../org/apache/beam/sdk/util/DefaultBucket.java | 105 ------ .../apache/beam/sdk/util/GcpProjectUtil.java | 106 ------ .../extensions/gcp/options/GcpOptionsTest.java | 325 ++++++++++++------- .../apache/beam/sdk/util/DefaultBucketTest.java | 112 ------- .../beam/sdk/util/GcpProjectUtilTest.java | 77 ----- 7 files changed, 335 insertions(+), 528 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java index 68432cf..87557e5 100644 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/CloudResourceManagerOptions.java @@ -17,14 +17,10 @@ */ package org.apache.beam.sdk.extensions.gcp.options; -import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.beam.sdk.options.ApplicationNameOptions; -import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; -import org.apache.beam.sdk.util.GcpProjectUtil; /** * Properties needed when using Google CloudResourceManager with the Apache Beam SDK. @@ -33,14 +29,4 @@ import org.apache.beam.sdk.util.GcpProjectUtil; + "https://cloud.google.com/resource-manager/ for details on CloudResourceManager.") public interface CloudResourceManagerOptions extends ApplicationNameOptions, GcpOptions, PipelineOptions, StreamingOptions { - /** - * The GcpProjectUtil instance that should be used to communicate with Google Cloud Storage. - */ - @JsonIgnore - @Description("The GcpProjectUtil instance that should be used to communicate" - + " with Google Cloud Resource Manager.") - @Default.InstanceFactory(GcpProjectUtil.GcpProjectUtilFactory.class) - @Hidden - GcpProjectUtil getGcpProjectUtil(); - void setGcpProjectUtil(GcpProjectUtil value); } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java index 09904b6..b2a83e9 100644 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java +++ b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptions.java @@ -17,15 +17,24 @@ */ package org.apache.beam.sdk.extensions.gcp.options; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.model.Project; +import com.google.api.services.storage.model.Bucket; import com.google.auth.Credentials; +import com.google.cloud.hadoop.util.ResilientOperation; +import com.google.cloud.hadoop.util.RetryDeterminer; import com.google.common.annotations.VisibleForTesting; import com.google.common.io.Files; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.FileAlreadyExistsException; import java.security.GeneralSecurityException; import java.util.Locale; import java.util.Map; @@ -38,9 +47,12 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.DefaultBucket; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.PathValidator; +import org.apache.beam.sdk.util.Transport; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -207,13 +219,18 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { * Returns {@link PipelineOptions#getTempLocation} as the default GCP temp location. */ class GcpTempLocationFactory implements DefaultValueFactory<String> { + private static final FluentBackoff BACKOFF_FACTORY = + FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); + static final String DEFAULT_REGION = "us-central1"; + static final Logger LOG = LoggerFactory.getLogger(GcpTempLocationFactory.class); @Override @Nullable public String create(PipelineOptions options) { String tempLocation = options.getTempLocation(); if (isNullOrEmpty(tempLocation)) { - tempLocation = DefaultBucket.tryCreateDefaultBucket(options); + tempLocation = tryCreateDefaultBucket(options, Transport.newCloudResourceManagerClient( + options.as(CloudResourceManagerOptions.class)).build()); options.setTempLocation(tempLocation); } else { try { @@ -227,5 +244,108 @@ public interface GcpOptions extends GoogleApiDebugOptions, PipelineOptions { } return tempLocation; } + + /** + * Creates a default bucket or verifies the existence and proper access control + * of an existing default bucket. Returns the location if successful. + */ + @VisibleForTesting + static String tryCreateDefaultBucket( + PipelineOptions options, CloudResourceManager crmClient) { + GcsOptions gcpOptions = options.as(GcsOptions.class); + + final String projectId = gcpOptions.getProject(); + checkArgument(!isNullOrEmpty(projectId), + "--project is a required option."); + + // Look up the project number, to create a default bucket with a stable + // name with no special characters. + long projectNumber = 0L; + try { + projectNumber = getProjectNumber(projectId, crmClient); + } catch (IOException e) { + throw new RuntimeException("Unable to verify project with ID " + projectId, e); + } + String region = DEFAULT_REGION; + if (!isNullOrEmpty(gcpOptions.getZone())) { + region = getRegionFromZone(gcpOptions.getZone()); + } + final String bucketName = + "dataflow-staging-" + region + "-" + projectNumber; + LOG.info("No staging location provided, attempting to use default bucket: {}", + bucketName); + Bucket bucket = new Bucket() + .setName(bucketName) + .setLocation(region); + // Always try to create the bucket before checking access, so that we do not + // race with other pipelines that may be attempting to do the same thing. + try { + gcpOptions.getGcsUtil().createBucket(projectId, bucket); + } catch (FileAlreadyExistsException e) { + LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName); + } catch (IOException e) { + throw new RuntimeException("Unable create default bucket.", e); + } + + // Once the bucket is expected to exist, verify that it is correctly owned + // by the project executing the job. + try { + long owner = gcpOptions.getGcsUtil().bucketOwner( + GcsPath.fromComponents(bucketName, "")); + checkArgument( + owner == projectNumber, + "Bucket owner does not match the project from --project:" + + " %s vs. %s", owner, projectNumber); + } catch (IOException e) { + throw new RuntimeException( + "Unable to determine the owner of the default bucket at gs://" + bucketName, e); + } + return "gs://" + bucketName; + } + + /** + * Returns the project number or throws an exception if the project does not + * exist or has other access exceptions. + */ + private static long getProjectNumber( + String projectId, + CloudResourceManager crmClient) throws IOException { + return getProjectNumber( + projectId, + crmClient, + BACKOFF_FACTORY.backoff(), + Sleeper.DEFAULT); + } + + /** + * Returns the project number or throws an error if the project does not + * exist or has other access errors. + */ + private static long getProjectNumber( + String projectId, + CloudResourceManager crmClient, + BackOff backoff, + Sleeper sleeper) throws IOException { + CloudResourceManager.Projects.Get getProject = + crmClient.projects().get(projectId); + try { + Project project = ResilientOperation.retry( + ResilientOperation.getGoogleRequestCallable(getProject), + backoff, + RetryDeterminer.SOCKET_ERRORS, + IOException.class, + sleeper); + return project.getProjectNumber(); + } catch (Exception e) { + throw new IOException("Unable to get project number", e); + } + } + + @VisibleForTesting + static String getRegionFromZone(String zone) { + String[] zoneParts = zone.split("-"); + checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone); + return zoneParts[0] + "-" + zoneParts[1]; + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java deleted file mode 100644 index 6e7298c..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/DefaultBucket.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.isNullOrEmpty; - -import com.google.api.services.storage.model.Bucket; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.nio.file.FileAlreadyExistsException; -import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Utility class for handling default GCS buckets. - */ -public class DefaultBucket { - static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class); - - static final String DEFAULT_REGION = "us-central1"; - - /** - * Creates a default bucket or verifies the existence and proper access control - * of an existing default bucket. Returns the location if successful. - */ - public static String tryCreateDefaultBucket(PipelineOptions options) { - GcsOptions gcpOptions = options.as(GcsOptions.class); - - final String projectId = gcpOptions.getProject(); - checkArgument(!isNullOrEmpty(projectId), - "--project is a required option."); - - // Look up the project number, to create a default bucket with a stable - // name with no special characters. - long projectNumber = 0L; - try { - projectNumber = gcpOptions.as(CloudResourceManagerOptions.class) - .getGcpProjectUtil().getProjectNumber(projectId); - } catch (IOException e) { - throw new RuntimeException("Unable to verify project with ID " + projectId, e); - } - String region = DEFAULT_REGION; - if (!isNullOrEmpty(gcpOptions.getZone())) { - region = getRegionFromZone(gcpOptions.getZone()); - } - final String bucketName = - "dataflow-staging-" + region + "-" + projectNumber; - LOG.info("No staging location provided, attempting to use default bucket: {}", - bucketName); - Bucket bucket = new Bucket() - .setName(bucketName) - .setLocation(region); - // Always try to create the bucket before checking access, so that we do not - // race with other pipelines that may be attempting to do the same thing. - try { - gcpOptions.getGcsUtil().createBucket(projectId, bucket); - } catch (FileAlreadyExistsException e) { - LOG.debug("Bucket '{}'' already exists, verifying access.", bucketName); - } catch (IOException e) { - throw new RuntimeException("Unable create default bucket.", e); - } - - // Once the bucket is expected to exist, verify that it is correctly owned - // by the project executing the job. - try { - long owner = gcpOptions.getGcsUtil().bucketOwner( - GcsPath.fromComponents(bucketName, "")); - checkArgument( - owner == projectNumber, - "Bucket owner does not match the project from --project:" - + " %s vs. %s", owner, projectNumber); - } catch (IOException e) { - throw new RuntimeException( - "Unable to determine the owner of the default bucket at gs://" + bucketName, e); - } - return "gs://" + bucketName; - } - - @VisibleForTesting - static String getRegionFromZone(String zone) { - String[] zoneParts = zone.split("-"); - checkArgument(zoneParts.length >= 2, "Invalid zone provided: %s", zone); - return zoneParts[0] + "-" + zoneParts[1]; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java b/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java deleted file mode 100644 index 02b402a..0000000 --- a/sdks/java/extensions/gcp-core/src/main/java/org/apache/beam/sdk/util/GcpProjectUtil.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.api.client.util.BackOff; -import com.google.api.client.util.Sleeper; -import com.google.api.services.cloudresourcemanager.CloudResourceManager; -import com.google.api.services.cloudresourcemanager.model.Project; -import com.google.cloud.hadoop.util.ResilientOperation; -import com.google.cloud.hadoop.util.RetryDeterminer; -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.options.DefaultValueFactory; -import org.apache.beam.sdk.options.PipelineOptions; -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Provides operations on Google Cloud Platform Projects. - */ -public class GcpProjectUtil { - /** - * A {@link DefaultValueFactory} able to create a {@link GcpProjectUtil} using - * any transport flags specified on the {@link PipelineOptions}. - */ - public static class GcpProjectUtilFactory implements DefaultValueFactory<GcpProjectUtil> { - /** - * Returns an instance of {@link GcpProjectUtil} based on the - * {@link PipelineOptions}. - */ - @Override - public GcpProjectUtil create(PipelineOptions options) { - LOG.debug("Creating new GcpProjectUtil"); - CloudResourceManagerOptions crmOptions = options.as(CloudResourceManagerOptions.class); - return new GcpProjectUtil( - Transport.newCloudResourceManagerClient(crmOptions).build()); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(GcpProjectUtil.class); - - private static final FluentBackoff BACKOFF_FACTORY = - FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200)); - - /** Client for the CRM API. */ - private CloudResourceManager crmClient; - - private GcpProjectUtil(CloudResourceManager crmClient) { - this.crmClient = crmClient; - } - - // Use this only for testing purposes. - @VisibleForTesting - void setCrmClient(CloudResourceManager crmClient) { - this.crmClient = crmClient; - } - - /** - * Returns the project number or throws an exception if the project does not - * exist or has other access exceptions. - */ - public long getProjectNumber(String projectId) throws IOException { - return getProjectNumber( - projectId, - BACKOFF_FACTORY.backoff(), - Sleeper.DEFAULT); - } - - /** - * Returns the project number or throws an error if the project does not - * exist or has other access errors. - */ - @VisibleForTesting - long getProjectNumber(String projectId, BackOff backoff, Sleeper sleeper) throws IOException { - CloudResourceManager.Projects.Get getProject = - crmClient.projects().get(projectId); - try { - Project project = ResilientOperation.retry( - ResilientOperation.getGoogleRequestCallable(getProject), - backoff, - RetryDeterminer.SOCKET_ERRORS, - IOException.class, - sleeper); - return project.getProjectNumber(); - } catch (Exception e) { - throw new IOException("Unable to get project number", e); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java index d334359..68b3818 100644 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/options/GcpOptionsTest.java @@ -21,152 +21,253 @@ import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import com.google.api.services.cloudresourcemanager.CloudResourceManager; +import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects; +import com.google.api.services.cloudresourcemanager.CloudResourceManager.Projects.Get; +import com.google.api.services.cloudresourcemanager.model.Project; +import com.google.api.services.storage.model.Bucket; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; +import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.DefaultProjectFactory; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions.GcpTempLocationFactory; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.RestoreSystemProperties; +import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.NoopPathValidator; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.runners.Enclosed; import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import org.junit.rules.TestRule; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; /** Tests for {@link GcpOptions}. */ -@RunWith(JUnit4.class) +@RunWith(Enclosed.class) public class GcpOptionsTest { - @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testGetProjectFromCloudSdkConfigEnv() throws Exception { - Map<String, String> environment = - ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest(tmpFolder.newFile("properties"), environment)); - } - @Test - public void testGetProjectFromAppDataEnv() throws Exception { - Map<String, String> environment = - ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath()); - System.setProperty("os.name", "windows"); - assertEquals("test-project", - runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"), - environment)); - } + /** Tests for the majority of methods. */ + @RunWith(JUnit4.class) + public static class Common { + @Rule public TestRule restoreSystemProperties = new RestoreSystemProperties(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule public ExpectedException thrown = ExpectedException.none(); - @Test - public void testGetProjectFromUserHomeEnvOld() throws Exception { - Map<String, String> environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), - environment)); - } + @Test + public void testGetProjectFromCloudSdkConfigEnv() throws Exception { + Map<String, String> environment = + ImmutableMap.of("CLOUDSDK_CONFIG", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", + runGetProjectTest(tmpFolder.newFile("properties"), environment)); + } - @Test - public void testGetProjectFromUserHomeEnv() throws Exception { - Map<String, String> environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - assertEquals("test-project", - runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), - environment)); - } + @Test + public void testGetProjectFromAppDataEnv() throws Exception { + Map<String, String> environment = + ImmutableMap.of("APPDATA", tmpFolder.getRoot().getAbsolutePath()); + System.setProperty("os.name", "windows"); + assertEquals("test-project", + runGetProjectTest(new File(tmpFolder.newFolder("gcloud"), "properties"), + environment)); + } - @Test - public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception { - Map<String, String> environment = ImmutableMap.of(); - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - makePropertiesFileWithProject(new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), - "old-project"); - assertEquals("test-project", - runGetProjectTest( - new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), - environment)); - } + @Test + public void testGetProjectFromUserHomeEnvOld() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", + runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), + environment)); + } - @Test - public void testUnableToGetDefaultProject() throws Exception { - System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); - DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); - when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of()); - assertNull(projectFactory.create(PipelineOptionsFactory.create())); - } + @Test + public void testGetProjectFromUserHomeEnv() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + assertEquals("test-project", runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), + environment)); + } - @Test - public void testEmptyGcpTempLocation() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - options.setProject(""); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("--project is a required option"); - options.getGcpTempLocation(); - } + @Test + public void testGetProjectFromUserHomeOldAndNewPrefersNew() throws Exception { + Map<String, String> environment = ImmutableMap.of(); + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + makePropertiesFileWithProject( + new File(tmpFolder.newFolder(".config", "gcloud"), "properties"), "old-project"); + assertEquals("test-project", runGetProjectTest( + new File(tmpFolder.newFolder(".config", "gcloud", "configurations"), "config_default"), + environment)); + } - @Test - public void testDefaultGcpTempLocation() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - String tempLocation = "gs://bucket"; - options.setTempLocation(tempLocation); - options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class); - assertEquals(tempLocation, options.getGcpTempLocation()); - } + @Test + public void testUnableToGetDefaultProject() throws Exception { + System.setProperty("user.home", tmpFolder.getRoot().getAbsolutePath()); + DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); + when(projectFactory.getEnvironment()).thenReturn(ImmutableMap.<String, String>of()); + assertNull(projectFactory.create(PipelineOptionsFactory.create())); + } - @Test - public void testDefaultGcpTempLocationInvalid() throws Exception { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - options.setTempLocation("file://"); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path"); - options.getGcpTempLocation(); - } + @Test + public void testEmptyGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setGcpCredential(new TestCredential()); + options.setProject(""); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("--project is a required option"); + options.getGcpTempLocation(); + } - @Test - public void testDefaultGcpTempLocationDoesNotExist() { - GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); - String tempLocation = "gs://does/not/exist"; - options.setTempLocation(tempLocation); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage( - "Error constructing default value for gcpTempLocation: tempLocation is not" - + " a valid GCS path"); - thrown.expectCause( - hasMessage(containsString("Output path does not exist or is not writeable"))); - - options.getGcpTempLocation(); - } + @Test + public void testDefaultGcpTempLocation() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + String tempLocation = "gs://bucket"; + options.setTempLocation(tempLocation); + options.as(GcsOptions.class).setPathValidatorClass(NoopPathValidator.class); + assertEquals(tempLocation, options.getGcpTempLocation()); + } + + @Test + public void testDefaultGcpTempLocationInvalid() throws Exception { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + options.setTempLocation("file://"); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path"); + options.getGcpTempLocation(); + } + + @Test + public void testDefaultGcpTempLocationDoesNotExist() { + GcpOptions options = PipelineOptionsFactory.as(GcpOptions.class); + String tempLocation = "gs://does/not/exist"; + options.setTempLocation(tempLocation); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Error constructing default value for gcpTempLocation: tempLocation is not" + + " a valid GCS path"); + thrown.expectCause( + hasMessage(containsString("Output path does not exist or is not writeable"))); + + options.getGcpTempLocation(); + } - private static void makePropertiesFileWithProject(File path, String projectId) - throws IOException { - String properties = String.format("[core]%n" - + "account = test-acco...@google.com%n" - + "project = %s%n" - + "%n" - + "[dataflow]%n" - + "magic = true%n", projectId); - Files.write(properties, path, StandardCharsets.UTF_8); + private static void makePropertiesFileWithProject(File path, String projectId) + throws IOException { + String properties = String.format("[core]%n" + + "account = test-acco...@google.com%n" + + "project = %s%n" + + "%n" + + "[dataflow]%n" + + "magic = true%n", projectId); + Files.write(properties, path, StandardCharsets.UTF_8); + } + + private static String runGetProjectTest(File path, Map<String, String> environment) + throws Exception { + makePropertiesFileWithProject(path, "test-project"); + DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); + when(projectFactory.getEnvironment()).thenReturn(environment); + return projectFactory.create(PipelineOptionsFactory.create()); + } } - private static String runGetProjectTest(File path, Map<String, String> environment) - throws Exception { - makePropertiesFileWithProject(path, "test-project"); - DefaultProjectFactory projectFactory = spy(new DefaultProjectFactory()); - when(projectFactory.getEnvironment()).thenReturn(environment); - return projectFactory.create(PipelineOptionsFactory.create()); + /** Tests related to determining the GCP temp location. */ + @RunWith(JUnit4.class) + public static class GcpTempLocation { + @Rule public ExpectedException thrown = ExpectedException.none(); + @Mock private GcsUtil mockGcsUtil; + @Mock private CloudResourceManager mockCrmClient; + @Mock private Projects mockProjects; + @Mock private Get mockGet; + private Project fakeProject; + private PipelineOptions options; + + @Before + public void setUp() throws Exception { + MockitoAnnotations.initMocks(this); + options = PipelineOptionsFactory.create(); + options.as(GcsOptions.class).setGcsUtil(mockGcsUtil); + options.as(GcpOptions.class).setProject("foo"); + options.as(GcpOptions.class).setZone("us-north1-a"); + when(mockCrmClient.projects()).thenReturn(mockProjects); + when(mockProjects.get(any(String.class))).thenReturn(mockGet); + fakeProject = new Project().setProjectNumber(1L); + } + + @Test + public void testCreateBucket() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(1L); + + String bucket = GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + assertEquals("gs://dataflow-staging-us-north1-1", bucket); + } + + @Test + public void testCreateBucketProjectLookupFails() throws Exception { + doThrow(new IOException("badness")).when(mockGet).execute(); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to verify project"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void testCreateBucketCreateBucketFails() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + doThrow(new IOException("badness")).when( + mockGcsUtil).createBucket(any(String.class), any(Bucket.class)); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable create default bucket"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void testCannotGetBucketOwner() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + when(mockGcsUtil.bucketOwner(any(GcsPath.class))) + .thenThrow(new IOException("badness")); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Unable to determine the owner"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void testProjectMismatch() throws Exception { + doReturn(fakeProject).when(mockGet).execute(); + when(mockGcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Bucket owner does not match the project"); + GcpTempLocationFactory.tryCreateDefaultBucket(options, mockCrmClient); + } + + @Test + public void regionFromZone() throws Exception { + assertEquals("us-central1", GcpTempLocationFactory.getRegionFromZone("us-central1-a")); + assertEquals("asia-east", GcpTempLocationFactory.getRegionFromZone("asia-east-a")); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java deleted file mode 100644 index 65cb90b..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/DefaultBucketTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.when; - -import com.google.api.services.storage.model.Bucket; -import java.io.IOException; -import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.MockitoAnnotations; -import org.mockito.MockitoAnnotations.Mock; - -/** Tests for DefaultBucket. */ -@RunWith(JUnit4.class) -public class DefaultBucketTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private PipelineOptions options; - @Mock - private GcsUtil gcsUtil; - @Mock - private GcpProjectUtil gcpUtil; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - options = PipelineOptionsFactory.create(); - options.as(GcsOptions.class).setGcsUtil(gcsUtil); - options.as(CloudResourceManagerOptions.class).setGcpProjectUtil(gcpUtil); - options.as(GcpOptions.class).setProject("foo"); - options.as(GcpOptions.class).setZone("us-north1-a"); - } - - @Test - public void testCreateBucket() { - String bucket = DefaultBucket.tryCreateDefaultBucket(options); - assertEquals("gs://dataflow-staging-us-north1-0", bucket); - } - - @Test - public void testCreateBucketProjectLookupFails() throws IOException { - when(gcpUtil.getProjectNumber("foo")).thenThrow(new IOException("badness")); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to verify project"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void testCreateBucketCreateBucketFails() throws IOException { - doThrow(new IOException("badness")).when( - gcsUtil).createBucket(any(String.class), any(Bucket.class)); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable create default bucket"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void testCannotGetBucketOwner() throws IOException { - when(gcsUtil.bucketOwner(any(GcsPath.class))) - .thenThrow(new IOException("badness")); - - thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to determine the owner"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void testProjectMismatch() throws IOException { - when(gcsUtil.bucketOwner(any(GcsPath.class))).thenReturn(5L); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("Bucket owner does not match the project"); - DefaultBucket.tryCreateDefaultBucket(options); - } - - @Test - public void regionFromZone() throws IOException { - assertEquals("us-central1", DefaultBucket.getRegionFromZone("us-central1-a")); - assertEquals("asia-east", DefaultBucket.getRegionFromZone("asia-east-a")); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/c1b35f46/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java deleted file mode 100644 index 253787d..0000000 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/util/GcpProjectUtilTest.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.when; - -import com.google.api.client.util.BackOff; -import com.google.api.services.cloudresourcemanager.CloudResourceManager; -import com.google.api.services.cloudresourcemanager.model.Project; -import java.net.SocketTimeoutException; -import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; -import org.apache.beam.sdk.extensions.gcp.options.CloudResourceManagerOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.FastNanoClockAndSleeper; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mockito; - -/** Test case for {@link GcpProjectUtil}. */ -@RunWith(JUnit4.class) -public class GcpProjectUtilTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private static CloudResourceManagerOptions crmOptionsWithTestCredential() { - CloudResourceManagerOptions pipelineOptions = - PipelineOptionsFactory.as(CloudResourceManagerOptions.class); - pipelineOptions.setGcpCredential(new TestCredential()); - return pipelineOptions; - } - - @Test - public void testGetProjectNumber() throws Exception { - CloudResourceManagerOptions pipelineOptions = crmOptionsWithTestCredential(); - GcpProjectUtil projectUtil = pipelineOptions.getGcpProjectUtil(); - - CloudResourceManager.Projects mockProjects = Mockito.mock( - CloudResourceManager.Projects.class); - CloudResourceManager mockCrm = Mockito.mock(CloudResourceManager.class); - projectUtil.setCrmClient(mockCrm); - - CloudResourceManager.Projects.Get mockProjectsGet = - Mockito.mock(CloudResourceManager.Projects.Get.class); - - BackOff mockBackOff = FluentBackoff.DEFAULT.backoff(); - Project project = new Project(); - project.setProjectNumber(5L); - - when(mockCrm.projects()).thenReturn(mockProjects); - when(mockProjects.get(any(String.class))).thenReturn(mockProjectsGet); - when(mockProjectsGet.execute()) - .thenThrow(new SocketTimeoutException("SocketException")) - .thenReturn(project); - - assertEquals(5L, projectUtil.getProjectNumber( - "foo", mockBackOff, new FastNanoClockAndSleeper())); - } -}