Verify in unit test that BigQuery executing project is used.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/70c1e2c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/70c1e2c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/70c1e2c7 Branch: refs/heads/master Commit: 70c1e2c7e005336d764ef9b0cf02afeb967feac5 Parents: ffed1d4 Author: Pei He <pe...@google.com> Authored: Mon May 23 14:57:59 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue May 24 21:31:05 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/BigQueryIOTest.java | 39 +++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/70c1e2c7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 7c360b9..2865b23 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -21,6 +21,8 @@ import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString; import static org.apache.beam.sdk.io.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static com.google.common.base.Preconditions.checkArgument; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -64,6 +66,7 @@ import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.PCollection; import com.google.api.client.util.Data; +import com.google.api.client.util.Strings; import com.google.api.services.bigquery.model.ErrorProto; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -205,6 +208,7 @@ public class BigQueryIOTest implements Serializable { private Object[] startJobReturns; private Object[] pollJobReturns; + private String executingProject; // Both counts will be reset back to zeros after serialization. // This is a work around for DoFn's verifyUnmodified check. private transient int startJobCallsCount; @@ -238,27 +242,42 @@ public class BigQueryIOTest implements Serializable { return this; } + /** + * Verifies executing project. + */ + public FakeJobService verifyExecutingProject(String executingProject) { + this.executingProject = executingProject; + return this; + } + @Override public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { - startJob(); + startJob(jobRef); } @Override public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException { - startJob(); + startJob(jobRef); } @Override public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) throws IOException, InterruptedException { - startJob(); + startJob(jobRef); } @Override public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { + if (!Strings.isNullOrEmpty(executingProject)) { + checkArgument( + jobRef.getProjectId().equals(executingProject), + "Project id: %s is not equal to executing project: %s", + jobRef.getProjectId(), executingProject); + } + if (pollJobStatusCallsCount < pollJobReturns.length) { Object ret = pollJobReturns[pollJobStatusCallsCount++]; if (ret instanceof Job) { @@ -276,7 +295,14 @@ public class BigQueryIOTest implements Serializable { } } - private void startJob() throws IOException, InterruptedException { + private void startJob(JobReference jobRef) throws IOException, InterruptedException { + if (!Strings.isNullOrEmpty(executingProject)) { + checkArgument( + jobRef.getProjectId().equals(executingProject), + "Project id: %s is not equal to executing project: %s", + jobRef.getProjectId(), executingProject); + } + if (startJobCallsCount < startJobReturns.length) { Object ret = startJobReturns[startJobCallsCount++]; if (ret instanceof IOException) { @@ -479,7 +505,8 @@ public class BigQueryIOTest implements Serializable { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(new FakeJobService() .startJobReturns("done", "done") - .pollJobReturns(Status.UNKNOWN)) + .pollJobReturns(Status.UNKNOWN) + .verifyExecutingProject(bqOptions.getProject())) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", 1)), toJsonString(new TableRow().set("name", "b").set("number", 2)), @@ -487,7 +514,7 @@ public class BigQueryIOTest implements Serializable { Pipeline p = TestPipeline.create(bqOptions); PCollection<String> output = p - .apply(BigQueryIO.Read.from("foo.com:project:somedataset.sometable") + .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable") .withTestServices(fakeBqServices) .withoutValidation()) .apply(ParDo.of(new DoFn<TableRow, String>() {