[BEAM-151] Break out Dataflow transport creation to another file This prevents moving DataflowPipelineOptions into a Dataflow runner maven module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d0db477a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d0db477a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d0db477a Branch: refs/heads/master Commit: d0db477a0ce436728f71f0f4aec0b0098eac66be Parents: c8cb55a Author: Luke Cwik <[email protected]> Authored: Mon Mar 28 13:09:33 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Apr 7 11:19:49 2016 -0700 ---------------------------------------------------------------------- .../options/DataflowPipelineDebugOptions.java | 5 +- .../sdk/runners/DataflowPipelineRunner.java | 6 +- .../dataflow/sdk/util/DataflowTransport.java | 111 +++++++++++++++++++ .../cloud/dataflow/sdk/util/Transport.java | 46 -------- .../sdk/options/GoogleApiDebugOptionsTest.java | 93 ++++++++-------- 5 files changed, 163 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java index b55fa17..6231bd4 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java @@ -20,11 +20,11 @@ package com.google.cloud.dataflow.sdk.options; import com.google.api.services.dataflow.Dataflow; import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.util.DataflowPathValidator; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.GcsStager; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.PathValidator; import com.google.cloud.dataflow.sdk.util.Stager; -import com.google.cloud.dataflow.sdk.util.Transport; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -159,7 +159,8 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions { public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> { @Override public Dataflow create(PipelineOptions options) { - return Transport.newDataflowClient(options.as(DataflowPipelineOptions.class)).build(); + return DataflowTransport.newDataflowClient( + options.as(DataflowPipelineOptions.class)).build(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 5f43cc3..50ca36f 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -99,6 +99,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.InstanceBuilder; import com.google.cloud.dataflow.sdk.util.MonitoringUtil; @@ -107,7 +108,6 @@ import com.google.cloud.dataflow.sdk.util.PathValidator; import com.google.cloud.dataflow.sdk.util.PropertyNames; import com.google.cloud.dataflow.sdk.util.Reshuffle; import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal; -import com.google.cloud.dataflow.sdk.util.Transport; import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder; @@ -444,7 +444,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> throw new RuntimeException("Should not specify the debuggee"); } - Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build(); + Clouddebugger debuggerClient = DataflowTransport.newClouddebuggerClient(options).build(); Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier); options.setDebuggee(debuggee); @@ -600,7 +600,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> // regularly and need not be retried automatically. DataflowPipelineJob dataflowPipelineJob = new DataflowPipelineJob(options.getProject(), jobResult.getId(), - Transport.newRawDataflowClient(options).build(), aggregatorTransforms); + DataflowTransport.newRawDataflowClient(options).build(), aggregatorTransforms); // If the service returned client request id, the SDK needs to compare it // with the original id generated in the request, if they are not the same http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java new file mode 100644 index 0000000..8de358c --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java @@ -0,0 +1,111 @@ +/******************************************************************************* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + ******************************************************************************/ + +package com.google.cloud.dataflow.sdk.util; + +import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory; +import static com.google.cloud.dataflow.sdk.util.Transport.getTransport; + +import com.google.api.client.auth.oauth2.Credential; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.services.clouddebugger.v2.Clouddebugger; +import com.google.api.services.dataflow.Dataflow; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; +import com.google.common.collect.ImmutableList; + +import java.net.MalformedURLException; +import java.net.URL; + +/** + * Helpers for cloud communication. + */ +public class DataflowTransport { + + + private static class ApiComponents { + public String rootUrl; + public String servicePath; + + public ApiComponents(String root, String path) { + this.rootUrl = root; + this.servicePath = path; + } + } + + private static ApiComponents apiComponentsFromUrl(String urlString) { + try { + URL url = new URL(urlString); + String rootUrl = url.getProtocol() + "://" + url.getHost() + + (url.getPort() > 0 ? ":" + url.getPort() : ""); + return new ApiComponents(rootUrl, url.getPath()); + } catch (MalformedURLException e) { + throw new RuntimeException("Invalid URL: " + urlString); + } + } + + /** + * Returns a Google Cloud Dataflow client builder. + */ + public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { + String servicePath = options.getDataflowEndpoint(); + ApiComponents components; + if (servicePath.contains("://")) { + components = apiComponentsFromUrl(servicePath); + } else { + components = new ApiComponents(options.getApiRootUrl(), servicePath); + } + + return new Dataflow.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer( + options.getGcpCredential(), + // Do not log 404. It clutters the output and is possibly even required by the caller. + new RetryHttpRequestInitializer(ImmutableList.of(404)))) + .setApplicationName(options.getAppName()) + .setRootUrl(components.rootUrl) + .setServicePath(components.servicePath) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { + return new Clouddebugger.Builder(getTransport(), + getJsonFactory(), + chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) + .setApplicationName(options.getAppName()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + /** + * Returns a Dataflow client that does not automatically retry failed + * requests. + */ + public static Dataflow.Builder + newRawDataflowClient(DataflowPipelineOptions options) { + return newDataflowClient(options) + .setHttpRequestInitializer(options.getGcpCredential()) + .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); + } + + private static HttpRequestInitializer chainHttpRequestInitializer( + Credential credential, HttpRequestInitializer httpRequestInitializer) { + if (credential == null) { + return httpRequestInitializer; + } else { + return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java index 187d164..5888822 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java @@ -24,13 +24,10 @@ import com.google.api.client.http.HttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.clouddebugger.v2.Clouddebugger; -import com.google.api.services.dataflow.Dataflow; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.GcsOptions; import com.google.cloud.dataflow.sdk.options.PubsubOptions; import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; @@ -128,49 +125,6 @@ public class Transport { } /** - * Returns a Google Cloud Dataflow client builder. - */ - public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { - String servicePath = options.getDataflowEndpoint(); - ApiComponents components; - if (servicePath.contains("://")) { - components = apiComponentsFromUrl(servicePath); - } else { - components = new ApiComponents(options.getApiRootUrl(), servicePath); - } - - return new Dataflow.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setApplicationName(options.getAppName()) - .setRootUrl(components.rootUrl) - .setServicePath(components.servicePath) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { - return new Clouddebugger.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Dataflow client that does not automatically retry failed - * requests. - */ - public static Dataflow.Builder - newRawDataflowClient(DataflowPipelineOptions options) { - return newDataflowClient(options) - .setHttpRequestInitializer(options.getGcpCredential()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** * Returns a Cloud Storage client builder. * * <p>Note: this client's endpoint is <b>not</b> modified by the http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0db477a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java index 3201608..c2f0bb8 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/options/GoogleApiDebugOptionsTest.java @@ -22,8 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import com.google.api.services.bigquery.Bigquery.Datasets.Delete; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Create; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Get; +import com.google.api.services.storage.Storage; import com.google.cloud.dataflow.sdk.options.GoogleApiDebugOptions.GoogleApiTracer; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.cloud.dataflow.sdk.util.Transport; @@ -37,104 +36,104 @@ import org.junit.runners.JUnit4; /** Tests for {@link GoogleApiDebugOptions}. */ @RunWith(JUnit4.class) public class GoogleApiDebugOptionsTest { + private static final String STORAGE_GET_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"}"; + private static final String STORAGE_GET_AND_LIST_TRACE = + "--googleApiTrace={\"Objects.Get\":\"GetTraceDestination\"," + + "\"Objects.List\":\"ListTraceDestination\"}"; + private static final String STORAGE_TRACE = "--googleApiTrace={\"Storage\":\"TraceDestination\"}"; + @Test public void testWhenTracingMatches() throws Exception { - String[] args = - new String[] {"--googleApiTrace={\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); - assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get request = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", request.get("$trace")); } @Test public void testWhenTracingDoesNotMatch() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Projects.Jobs.Create\":\"testToken\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get request = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.List request = + Transport.newStorageClient(options).build().objects().list("testProjectId"); assertNull(request.get("$trace")); } @Test public void testWithMultipleTraces() throws Exception { - String[] args = new String[] { - "--googleApiTrace={\"Projects.Jobs.Create\":\"CreateTraceDestination\"," - + "\"Projects.Jobs.Get\":\"GetTraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + String[] args = new String[] {STORAGE_GET_AND_LIST_TRACE}; + GcsOptions options = PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("GetTraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("CreateTraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("ListTraceDestination", listRequest.get("$trace")); } @Test - public void testMatchingAllDataflowCalls() throws Exception { - String[] args = new String[] {"--googleApiTrace={\"Dataflow\":\"TraceDestination\"}"}; - DataflowPipelineOptions options = - PipelineOptionsFactory.fromArgs(args).as(DataflowPipelineOptions.class); + public void testMatchingAllCalls() throws Exception { + String[] args = new String[] {STORAGE_TRACE}; + GcsOptions options = + PipelineOptionsFactory.fromArgs(args).as(GcsOptions.class); options.setGcpCredential(new TestCredential()); assertNotNull(options.getGoogleApiTrace()); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertEquals("TraceDestination", createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertEquals("TraceDestination", listRequest.get("$trace")); } @Test public void testMatchingAgainstClient() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build(), "TraceDestination")); + Transport.newStorageClient(options).build(), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Delete deleteRequest = Transport.newBigQueryClient(options).build().datasets() - .delete("testProjectId", "testDatasetId"); + Delete deleteRequest = Transport.newBigQueryClient(options.as(BigQueryOptions.class)) + .build().datasets().delete("testProjectId", "testDatasetId"); assertNull(deleteRequest.get("$trace")); } @Test public void testMatchingAgainstRequestType() throws Exception { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + GcsOptions options = PipelineOptionsFactory.as(GcsOptions.class); options.setGcpCredential(new TestCredential()); options.setGoogleApiTrace(new GoogleApiTracer().addTraceFor( - Transport.newDataflowClient(options).build().projects().jobs() - .get("aProjectId", "aJobId"), "TraceDestination")); + Transport.newStorageClient(options).build().objects() + .get("aProjectId", "aObjectId"), "TraceDestination")); - Get getRequest = - options.getDataflowClient().projects().jobs().get("testProjectId", "testJobId"); + Storage.Objects.Get getRequest = + Transport.newStorageClient(options).build().objects().get("testBucketId", "testObjectId"); assertEquals("TraceDestination", getRequest.get("$trace")); - Create createRequest = - options.getDataflowClient().projects().jobs().create("testProjectId", null); - assertNull(createRequest.get("$trace")); + Storage.Objects.List listRequest = + Transport.newStorageClient(options).build().objects().list("testProjectId"); + assertNull(listRequest.get("$trace")); } @Test
