http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 7998fc7..d9f53af 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -17,30 +17,51 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString; +import static org.apache.beam.sdk.io.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.when; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.io.BigQueryIO.BigQueryQuerySource; +import org.apache.beam.sdk.io.BigQueryIO.BigQueryTableSource; +import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup; +import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.BigQueryIO.Status; +import org.apache.beam.sdk.io.BigQueryIO.TransformingSource; import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BigQueryServices; +import org.apache.beam.sdk.util.BigQueryServices.DatasetService; +import org.apache.beam.sdk.util.BigQueryServices.JobService; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.PCollection; import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.ErrorProto; @@ -48,14 +69,21 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.JobStatistics2; +import com.google.api.services.bigquery.model.JobStatistics4; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; @@ -67,18 +95,24 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.io.Serializable; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; /** * Tests for BigQueryIO. */ @RunWith(JUnit4.class) -public class BigQueryIOTest { +public class BigQueryIOTest implements Serializable { // Status.UNKNOWN maps to null private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of( @@ -87,111 +121,192 @@ public class BigQueryIOTest { private static class FakeBigQueryServices implements BigQueryServices { - private Object[] startJobReturns; - private Object[] pollJobStatusReturns; + private String[] jsonTableRowReturns = new String[0]; + private JobService jobService; + private DatasetService datasetService; - /** - * Sets the return values for the mock {@link JobService#startLoadJob}. - * - * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - private FakeBigQueryServices startLoadJobReturns(Object... startLoadJobReturns) { - this.startJobReturns = startLoadJobReturns; + public FakeBigQueryServices withJobService(JobService jobService) { + this.jobService = jobService; return this; } - /** - * Sets the return values for the mock {@link JobService#pollJobStatus}. - * - * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - private FakeBigQueryServices pollJobStatusReturns(Object... pollJobStatusReturns) { - this.pollJobStatusReturns = pollJobStatusReturns; + public FakeBigQueryServices withDatasetService(DatasetService datasetService) { + this.datasetService = datasetService; + return this; + } + + public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; return this; } @Override public JobService getJobService(BigQueryOptions bqOptions) { - return new FakeLoadService(startJobReturns, pollJobStatusReturns); + return jobService; } - private static class FakeLoadService implements BigQueryServices.JobService { + @Override + public DatasetService getDatasetService(BigQueryOptions bqOptions) { + return datasetService; + } + + @Override + public BigQueryJsonReader getReaderFromTable( + BigQueryOptions bqOptions, TableReference tableRef) { + return new FakeBigQueryReader(jsonTableRowReturns); + } + + @Override + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) { + return new FakeBigQueryReader(jsonTableRowReturns); + } - private Object[] startJobReturns; - private Object[] pollJobStatusReturns; - private int startLoadJobCallsCount; - private int pollJobStatusCallsCount; + private static class FakeBigQueryReader implements BigQueryJsonReader { + private static final int UNSTARTED = -1; + private static final int CLOSED = Integer.MAX_VALUE; - public FakeLoadService(Object[] startLoadJobReturns, Object[] pollJobStatusReturns) { - this.startJobReturns = startLoadJobReturns; - this.pollJobStatusReturns = pollJobStatusReturns; - this.startLoadJobCallsCount = 0; - this.pollJobStatusCallsCount = 0; + private String[] jsonTableRowReturns; + private int currIndex; + + FakeBigQueryReader(String[] jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; + this.currIndex = UNSTARTED; } @Override - public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) - throws InterruptedException, IOException { - startJob(); + public boolean start() throws IOException { + assertEquals(UNSTARTED, currIndex); + currIndex = 0; + return currIndex < jsonTableRowReturns.length; } @Override - public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) - throws InterruptedException, IOException { - startJob(); + public boolean advance() throws IOException { + return ++currIndex < jsonTableRowReturns.length; } @Override - public void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) - throws IOException, InterruptedException { - startJob(); + public TableRow getCurrent() throws NoSuchElementException { + if (currIndex >= jsonTableRowReturns.length) { + throw new NoSuchElementException(); + } + return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class); } @Override - public Job pollJob(String projectId, String jobId, int maxAttemps) - throws InterruptedException { - if (pollJobStatusCallsCount < pollJobStatusReturns.length) { - Object ret = pollJobStatusReturns[pollJobStatusCallsCount++]; - if (ret instanceof Status) { - return JOB_STATUS_MAP.get(ret); - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - throw new RuntimeException("Unexpected return type: " + ret.getClass()); - } + public void close() throws IOException { + currIndex = CLOSED; + } + } + } + + private static class FakeJobService implements JobService, Serializable { + + private Object[] startJobReturns; + private Object[] pollJobReturns; + // Both counts will be reset back to zeros after serialization. + // This is a work around for DoFn's verifyUnmodified check. + private transient int startJobCallsCount; + private transient int pollJobStatusCallsCount; + + public FakeJobService() { + this.startJobReturns = new Object[0]; + this.pollJobReturns = new Object[0]; + this.startJobCallsCount = 0; + this.pollJobStatusCallsCount = 0; + } + + /** + * Sets the return values to mock {@link JobService#startLoadJob}, + * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}. + * + * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + public FakeJobService startJobReturns(Object... startJobReturns) { + this.startJobReturns = startJobReturns; + return this; + } + + /** + * Sets the return values to mock {@link JobService#pollJob}. + * + * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + public FakeJobService pollJobReturns(Object... pollJobReturns) { + this.pollJobReturns = pollJobReturns; + return this; + } + + @Override + public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) + throws InterruptedException, IOException { + startJob(); + } + + @Override + public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) + throws InterruptedException, IOException { + startJob(); + } + + @Override + public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) + throws IOException, InterruptedException { + startJob(); + } + + @Override + public Job pollJob(JobReference jobRef, int maxAttempts) + throws InterruptedException { + if (pollJobStatusCallsCount < pollJobReturns.length) { + Object ret = pollJobReturns[pollJobStatusCallsCount++]; + if (ret instanceof Job) { + return (Job) ret; + } else if (ret instanceof Status) { + return JOB_STATUS_MAP.get(ret); + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + pollJobStatusReturns.length); + throw new RuntimeException("Unexpected return type: " + ret.getClass()); } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + pollJobReturns.length); } + } - private void startJob() throws IOException, InterruptedException { - if (startLoadJobCallsCount < startJobReturns.length) { - Object ret = startJobReturns[startLoadJobCallsCount++]; - if (ret instanceof IOException) { - throw (IOException) ret; - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - return; - } + private void startJob() throws IOException, InterruptedException { + if (startJobCallsCount < startJobReturns.length) { + Object ret = startJobReturns[startJobCallsCount++]; + if (ret instanceof IOException) { + throw (IOException) ret; + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + startJobReturns.length); + return; } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + startJobReturns.length); } } + + @Override + public JobStatistics dryRunQuery(String projectId, String query) + throws InterruptedException, IOException { + throw new UnsupportedOperationException(); + } } - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule public ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); - @Rule - public TemporaryFolder testFolder = new TemporaryFolder(); - @Mock - public BigQueryServices.JobService mockBqLoadService; + @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); + @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); + @Mock public transient BigQueryServices.JobService mockJobService; + @Mock private transient IOChannelFactory mockIOChannelFactory; + @Mock private transient DatasetService mockDatasetService; - private BigQueryOptions bqOptions; + private transient BigQueryOptions bqOptions; private void checkReadTableObject( BigQueryIO.Read.Bound bound, String project, String dataset, String table) { @@ -205,16 +320,16 @@ public class BigQueryIOTest { private void checkReadTableObjectWithValidate( BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) { - assertEquals(project, bound.table.getProjectId()); - assertEquals(dataset, bound.table.getDatasetId()); - assertEquals(table, bound.table.getTableId()); + assertEquals(project, bound.getTable().getProjectId()); + assertEquals(dataset, bound.getTable().getDatasetId()); + assertEquals(table, bound.getTable().getTableId()); assertNull(bound.query); assertEquals(validate, bound.getValidate()); } private void checkReadQueryObjectWithValidate( BigQueryIO.Read.Bound bound, String query, boolean validate) { - assertNull(bound.table); + assertNull(bound.getTable()); assertEquals(query, bound.query); assertEquals(validate, bound.getValidate()); } @@ -241,10 +356,10 @@ public class BigQueryIOTest { } @Before - public void setUp() { + public void setUp() throws IOException { bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); - bqOptions.setTempLocation(testFolder.getRoot().getAbsolutePath() + "/BigQueryIOTest/"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); MockitoAnnotations.initMocks(this); } @@ -315,11 +430,7 @@ public class BigQueryIOTest { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - try { - p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); - } finally { - Assert.assertEquals("someproject", tableRef.getProjectId()); - } + p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); } @Test @@ -364,10 +475,40 @@ public class BigQueryIOTest { } @Test + public void testReadFromTable() { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done") + .pollJobReturns(Status.UNKNOWN)) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", 1)), + toJsonString(new TableRow().set("name", "b").set("number", 2)), + toJsonString(new TableRow().set("name", "c").set("number", 3))); + + Pipeline p = TestPipeline.create(bqOptions); + PCollection<String> output = p + .apply(BigQueryIO.Read.from("foo.com:project:somedataset.sometable") + .withTestServices(fakeBqServices) + .withoutValidation()) + .apply(ParDo.of(new DoFn<TableRow, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output((String) c.element().get("name")); + } + })); + + PAssert.that(output) + .containsInAnyOrder(ImmutableList.of("a", "b", "c")); + + p.run(); + } + + @Test public void testCustomSink() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .startLoadJobReturns("done", "done", "done") - .pollJobStatusReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED); + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done") + .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -398,8 +539,9 @@ public class BigQueryIOTest { @Test public void testCustomSinkUnknown() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .startLoadJobReturns("done", "done") - .pollJobStatusReturns(Status.FAILED, Status.UNKNOWN); + .withJobService(new FakeJobService() + .startJobReturns("done", "done") + .pollJobReturns(Status.FAILED, Status.UNKNOWN)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -717,4 +859,240 @@ public class BigQueryIOTest { .apply(Create.<TableRow>of()) .apply(BigQueryIO.Write.named("name")); } + + @Test + public void testBigQueryTableSourceThroughJsonAPI() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + String extractDestinationDir = "mock://tempLocation"; + BoundedSource<TableRow> bqSource = + BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + PipelineOptions options = PipelineOptionsFactory.create(); + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + } + + @Test + public void testBigQueryTableSourceInitSplit() throws Exception { + Job extractJob = new Job(); + JobStatistics jobStats = new JobStatistics(); + JobStatistics4 extractStats = new JobStatistics4(); + extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); + jobStats.setExtract(extractStats); + extractJob.setStatus(new JobStatus()) + .setStatistics(jobStats); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withDatasetService(mockDatasetService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + String extractDestinationDir = "mock://tempLocation"; + BoundedSource<TableRow> bqSource = + BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) + .thenReturn(extractJob); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation("mock://tempLocation"); + + IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + when(mockIOChannelFactory.resolve(anyString(), anyString())) + .thenReturn("mock://tempLocation/output"); + when(mockDatasetService.getTable(anyString(), anyString(), anyString())) + .thenReturn(new Table().setSchema(new TableSchema())); + + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options); + assertEquals(1, sources.size()); + BoundedSource<TableRow> actual = sources.get(0); + assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + Mockito.verify(mockJobService) + .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any()); + } + + @Test + public void testBigQueryQuerySourceInitSplit() throws Exception { + TableReference dryRunTable = new TableReference(); + + Job queryJob = new Job(); + JobStatistics queryJobStats = new JobStatistics(); + JobStatistics2 queryStats = new JobStatistics2(); + queryStats.setReferencedTables(ImmutableList.of(dryRunTable)); + queryJobStats.setQuery(queryStats); + queryJob.setStatus(new JobStatus()) + .setStatistics(queryJobStats); + + Job extractJob = new Job(); + JobStatistics extractJobStats = new JobStatistics(); + JobStatistics4 extractStats = new JobStatistics4(); + extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); + extractJobStats.setExtract(extractStats); + extractJob.setStatus(new JobStatus()) + .setStatistics(extractJobStats); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withDatasetService(mockDatasetService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String extractDestinationDir = "mock://tempLocation"; + TableReference destinationTable = BigQueryIO.parseTableSpec("project.data_set.table_name"); + String jsonDestinationTable = toJsonString(destinationTable); + BoundedSource<TableRow> bqSource = BigQueryQuerySource.create( + jobIdToken, "query", jsonDestinationTable, true /* flattenResults */, + extractDestinationDir, fakeBqServices); + + List<TableRow> expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation(extractDestinationDir); + + TableReference queryTable = new TableReference() + .setProjectId("testProejct") + .setDatasetId("testDataset") + .setTableId("testTable"); + when(mockJobService.dryRunQuery(anyString(), anyString())) + .thenReturn(new JobStatistics().setQuery( + new JobStatistics2() + .setTotalBytesProcessed(100L) + .setReferencedTables(ImmutableList.of(queryTable)))); + when(mockDatasetService.getTable( + eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId()))) + .thenReturn(new Table().setSchema(new TableSchema())); + when(mockDatasetService.getTable( + eq(destinationTable.getProjectId()), + eq(destinationTable.getDatasetId()), + eq(destinationTable.getTableId()))) + .thenReturn(new Table().setSchema(new TableSchema())); + IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + when(mockIOChannelFactory.resolve(anyString(), anyString())) + .thenReturn("mock://tempLocation/output"); + when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt())) + .thenReturn(extractJob); + + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options); + assertEquals(1, sources.size()); + BoundedSource<TableRow> actual = sources.get(0); + assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + Mockito.verify(mockJobService) + .startQueryJob( + Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any()); + Mockito.verify(mockJobService) + .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any()); + Mockito.verify(mockDatasetService) + .createDataset(anyString(), anyString(), anyString(), anyString()); + } + + @Test + public void testTransformingSource() throws Exception { + int numElements = 10000; + @SuppressWarnings("deprecation") + BoundedSource<Long> longSource = CountingSource.upTo(numElements); + SerializableFunction<Long, String> toStringFn = + new SerializableFunction<Long, String>() { + @Override + public String apply(Long input) { + return input.toString(); + }}; + BoundedSource<String> stringSource = new TransformingSource<>( + longSource, toStringFn, StringUtf8Coder.of()); + + List<String> expected = Lists.newArrayList(); + for (int i = 0; i < numElements; i++) { + expected.add(String.valueOf(i)); + } + + PipelineOptions options = PipelineOptionsFactory.create(); + Assert.assertThat( + SourceTestUtils.readFromSource(stringSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options); + + SourceTestUtils.assertSourcesEqualReferenceSource( + stringSource, stringSource.splitIntoBundles(100, options), options); + } + + @Test + @Category(RunnableOnService.class) + public void testPassThroughThenCleanup() throws Exception { + Pipeline p = TestPipeline.create(); + + PCollection<Integer> output = p + .apply(Create.of(1, 2, 3)) + .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + // no-op + }})); + + PAssert.that(output).containsInAnyOrder(1, 2, 3); + + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testPassThroughThenCleanupExecuted() throws Exception { + Pipeline p = TestPipeline.create(); + + p.apply(Create.<Integer>of()) + .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + throw new RuntimeException("cleanup executed"); + }})); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("cleanup executed"); + + p.run(); + } }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java index 238deed..3ec2b37 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java @@ -43,6 +43,7 @@ import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.collect.ImmutableList; @@ -117,6 +118,7 @@ public class BigQueryServicesImplTest { BackOff backoff = new AttemptBoundedExponentialBackOff( 5 /* attempts */, 1000 /* initialIntervalMillis */); JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff); + verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); @@ -198,8 +200,10 @@ public class BigQueryServicesImplTest { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); assertEquals(testJob, job); verify(response, times(1)).getStatusCode(); @@ -221,8 +225,10 @@ public class BigQueryServicesImplTest { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); assertEquals(testJob, job); verify(response, times(1)).getStatusCode(); @@ -244,8 +250,10 @@ public class BigQueryServicesImplTest { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF); assertEquals(null, job); verify(response, times(1)).getStatusCode(); @@ -253,6 +261,26 @@ public class BigQueryServicesImplTest { verify(response, times(1)).getContentType(); } + @Test + public void testExecuteWithRetries() throws IOException, InterruptedException { + Table testTable = new Table(); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testTable)); + + Table table = BigQueryServicesImpl.executeWithRetries( + bigquery.tables().get("projectId", "datasetId", "tableId"), + "Failed to get table.", + Sleeper.DEFAULT, + BackOff.STOP_BACKOFF); + + assertEquals(testTable, table); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + /** A helper to wrap a {@link GenericJson} object in a content stream. */ private static InputStream toStream(GenericJson content) throws IOException { return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));