Repository: beam Updated Branches: refs/heads/master 6280d497b -> c1a757476
Removes unnecessary calls to ValueProvider.isAccessible Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/97810b4b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/97810b4b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/97810b4b Branch: refs/heads/master Commit: 97810b4b23037fe333af103661bbb15acec96a57 Parents: 6280d49 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Thu Aug 17 19:44:17 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Aug 29 15:42:10 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/AvroIO.java | 10 +---- .../beam/sdk/io/DefaultFilenamePolicy.java | 25 +++++------ .../org/apache/beam/sdk/io/FileBasedSink.java | 4 +- .../org/apache/beam/sdk/io/FileBasedSource.java | 25 +++-------- .../java/org/apache/beam/sdk/io/TFRecordIO.java | 15 +------ .../java/org/apache/beam/sdk/io/TextIO.java | 14 +----- .../java/org/apache/beam/sdk/io/WriteFiles.java | 6 +-- .../apache/beam/sdk/options/ValueProvider.java | 18 +++++--- .../sdk/transforms/display/DisplayData.java | 8 ++-- .../org/apache/beam/sdk/io/WriteFilesTest.java | 2 +- .../beam/sdk/options/ValueProviderTest.java | 15 +++---- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 47 +++++++++++--------- .../io/gcp/bigquery/BigQueryTableSource.java | 2 - .../apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 35 +++++++-------- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 6 +-- .../io/gcp/pubsub/PubsubUnboundedSource.java | 23 +++------- 16 files changed, 95 insertions(+), 160 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 910d8e2..9e0422e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -989,19 +989,11 @@ public class AvroIO { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); resolveDynamicDestinations().populateDisplayData(builder); - - String tempDirectory = null; - if (getTempDirectory() != null) { - tempDirectory = - getTempDirectory().isAccessible() - ? getTempDirectory().get().toString() - : getTempDirectory().toString(); - } builder .addIfNotDefault( DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) .addIfNotNull( - DisplayData.item("tempDirectory", tempDirectory) + DisplayData.item("tempDirectory", getTempDirectory()) .withLabel("Directory for temporary files")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java index 1f438d5..2f22e82 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java @@ -368,26 +368,21 @@ public final class DefaultFilenamePolicy extends FilenamePolicy { @Override public void populateDisplayData(DisplayData.Builder builder) { - String filenamePattern; - if (params.baseFilename.isAccessible()) { - filenamePattern = - String.format("%s%s%s", params.baseFilename.get(), params.shardTemplate, params.suffix); - } else { - filenamePattern = - String.format("%s%s%s", params.baseFilename, params.shardTemplate, params.suffix); - } - - String outputPrefixString = null; - outputPrefixString = + String displayBaseFilename = params.baseFilename.isAccessible() ? params.baseFilename.get().toString() - : params.baseFilename.toString(); - builder.add(DisplayData.item("filenamePattern", filenamePattern).withLabel("Filename Pattern")); - builder.add(DisplayData.item("filePrefix", outputPrefixString).withLabel("Output File Prefix")); - builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix")); + : ("(" + params.baseFilename + ")"); + builder.add( + DisplayData.item( + "filenamePattern", + String.format("%s%s%s", displayBaseFilename, params.shardTemplate, params.suffix)) + .withLabel("Filename pattern")); + builder.add( + DisplayData.item("filePrefix", params.baseFilename).withLabel("Output File Prefix")); builder.add( DisplayData.item("shardNameTemplate", params.shardTemplate) .withLabel("Output Shard Name Template")); + builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix")); } private static String extractFilename(ResourceId input) { http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 4e2b61c..d618647 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -747,12 +747,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT> @Override public String toString() { - String tempDirectoryStr = - tempDirectory.isAccessible() ? tempDirectory.get().toString() : tempDirectory.toString(); return getClass().getSimpleName() + "{" + "tempDirectory=" - + tempDirectoryStr + + tempDirectory + ", windowedWrites=" + windowedWrites + '}'; http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java index 7f865de..f835fa4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java @@ -211,10 +211,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here // we perform the size estimation of files and file patterns using the interface provided by // FileSystem. - checkState( - fileOrPatternSpec.isAccessible(), - "Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}.", - fileOrPatternSpec); String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { @@ -240,10 +236,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); if (mode == Mode.FILEPATTERN) { - String patternDisplay = getFileOrPatternSpecProvider().isAccessible() - ? getFileOrPatternSpecProvider().get() - : getFileOrPatternSpecProvider().toString(); - builder.add(DisplayData.item("filePattern", patternDisplay).withLabel("File Pattern")); + builder.add( + DisplayData.item("filePattern", getFileOrPatternSpecProvider()) + .withLabel("File Pattern")); } } @@ -254,10 +249,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { // split a FileBasedSource based on a file pattern to FileBasedSources based on full single // files. For files that can be efficiently seeked, we further split FileBasedSources based on // those files to FileBasedSources based on sub ranges of single files. - checkState( - fileOrPatternSpec.isAccessible(), - "Cannot split a FileBasedSource without access to the file or pattern specification: {}.", - fileOrPatternSpec); String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { @@ -326,10 +317,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { public final BoundedReader<T> createReader(PipelineOptions options) throws IOException { // Validate the current source prior to creating a reader for it. this.validate(); - checkState( - fileOrPatternSpec.isAccessible(), - "Cannot create a file reader without access to the file or pattern specification: {}.", - fileOrPatternSpec); String fileOrPattern = fileOrPatternSpec.get(); if (mode == Mode.FILEPATTERN) { @@ -358,13 +345,11 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> { @Override public String toString() { - String fileString = fileOrPatternSpec.isAccessible() - ? fileOrPatternSpec.get() : fileOrPatternSpec.toString(); switch (mode) { case FILEPATTERN: - return fileString; + return fileOrPatternSpec.toString(); case SINGLE_FILE_OR_SUBRANGE: - return fileString + " range " + super.toString(); + return fileOrPatternSpec + " range " + super.toString(); default: throw new IllegalStateException("Unexpected mode: " + mode); } http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java index c75051f..526c50e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java @@ -195,15 +195,12 @@ public class TFRecordIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - - String filepatternDisplay = getFilepattern().isAccessible() - ? getFilepattern().get() : getFilepattern().toString(); builder .add(DisplayData.item("compressionType", getCompressionType().toString()) .withLabel("Compression Type")) .addIfNotDefault(DisplayData.item("validation", getValidate()) .withLabel("Validation Enabled"), true) - .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay) + .addIfNotNull(DisplayData.item("filePattern", getFilepattern()) .withLabel("File Pattern")); } } @@ -360,16 +357,8 @@ public class TFRecordIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - - String outputPrefixString = null; - if (getOutputPrefix().isAccessible()) { - ResourceId dir = getOutputPrefix().get(); - outputPrefixString = dir.toString(); - } else { - outputPrefixString = getOutputPrefix().toString(); - } builder - .add(DisplayData.item("filePrefix", outputPrefixString) + .add(DisplayData.item("filePrefix", getOutputPrefix()) .withLabel("Output File Prefix")) .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix()) .withLabel("Output File Suffix")) http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 612f5c5..cbc17ff 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -399,15 +399,12 @@ public class TextIO { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - - String filepatternDisplay = getFilepattern().isAccessible() - ? getFilepattern().get() : getFilepattern().toString(); builder .add( DisplayData.item("compressionType", getCompressionType().toString()) .withLabel("Compression Type")) .addIfNotNull( - DisplayData.item("filePattern", filepatternDisplay).withLabel("File Pattern")) + DisplayData.item("filePattern", getFilepattern()).withLabel("File Pattern")) .add( DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString()) .withLabel("Treatment of filepatterns that match no files")) @@ -904,18 +901,11 @@ public class TextIO { super.populateDisplayData(builder); resolveDynamicDestinations().populateDisplayData(builder); - String tempDirectory = null; - if (getTempDirectory() != null) { - tempDirectory = - getTempDirectory().isAccessible() - ? getTempDirectory().get().toString() - : getTempDirectory().toString(); - } builder .addIfNotDefault( DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0) .addIfNotNull( - DisplayData.item("tempDirectory", tempDirectory) + DisplayData.item("tempDirectory", getTempDirectory()) .withLabel("Directory for temporary files")) .addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header")) .addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer")) http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index 85c5652..7878c73 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -205,10 +205,8 @@ public class WriteFiles<UserT, DestinationT, OutputT> .include("sink", sink); if (getSharding() != null) { builder.include("sharding", getSharding()); - } else if (getNumShards() != null) { - String numShards = getNumShards().isAccessible() - ? getNumShards().get().toString() : getNumShards().toString(); - builder.add(DisplayData.item("numShards", numShards) + } else { + builder.addIfNotNull(DisplayData.item("numShards", getNumShards()) .withLabel("Fixed Number of Shards")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java index 94187a9..15413e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ValueProvider.java @@ -101,9 +101,7 @@ public interface ValueProvider<T> extends Serializable { @Override public String toString() { - return MoreObjects.toStringHelper(this) - .add("value", value) - .toString(); + return String.valueOf(value); } } @@ -160,8 +158,12 @@ public interface ValueProvider<T> extends Serializable { @Override public String toString() { + if (isAccessible()) { + return String.valueOf(get()); + } return MoreObjects.toStringHelper(this) .add("value", value) + .add("translator", translator.getClass().getSimpleName()) .toString(); } } @@ -226,7 +228,8 @@ public interface ValueProvider<T> extends Serializable { public T get() { PipelineOptions options = optionsMap.get(optionsId); if (options == null) { - throw new RuntimeException("Not called from a runtime context."); + throw new IllegalStateException( + "Value only available at runtime, but accessed from a non-runtime context: " + this); } try { Method method = klass.getMethod(methodName); @@ -249,8 +252,7 @@ public interface ValueProvider<T> extends Serializable { @Override public boolean isAccessible() { - PipelineOptions options = optionsMap.get(optionsId); - return options != null; + return optionsMap.get(optionsId) != null; } /** @@ -262,10 +264,12 @@ public interface ValueProvider<T> extends Serializable { @Override public String toString() { + if (isAccessible()) { + return String.valueOf(get()); + } return MoreObjects.toStringHelper(this) .add("propertyName", propertyName) .add("default", defaultValue) - .add("value", isAccessible() ? get() : null) .toString(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java index 3c4337b..10ef428 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java @@ -882,12 +882,12 @@ public class DisplayData implements Serializable { return item(key, Type.STRING, null); } Type type = inferType(got); - if (type == null) { - throw new RuntimeException(String.format("Unknown value type: %s", got)); + if (type != null) { + return item(key, type, got); } - return item(key, type, got); } - return item(key, Type.STRING, value.toString()); + // General case: not null and type not inferable. Fall back to toString of the VP itself. + return item(key, Type.STRING, String.valueOf(value)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java index 1d4ce08..5e0d685 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java @@ -518,7 +518,7 @@ public class WriteFilesTest { DisplayData displayData = DisplayData.from(write); assertThat(displayData, hasDisplayItem("sink", sink.getClass())); assertThat(displayData, includesDisplayDataFor("sink", sink)); - assertThat(displayData, hasDisplayItem("numShards", "1")); + assertThat(displayData, hasDisplayItem("numShards", 1)); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java index e596cc1..7bbbf7e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java @@ -88,7 +88,7 @@ public class ValueProviderTest { ValueProvider<String> provider = StaticValueProvider.of("foo"); assertEquals("foo", provider.get()); assertTrue(provider.isAccessible()); - assertEquals("StaticValueProvider{value=foo}", provider.toString()); + assertEquals("foo", provider.toString()); } @Test @@ -97,8 +97,9 @@ public class ValueProviderTest { ValueProvider<String> provider = options.getFoo(); assertFalse(provider.isAccessible()); - expectedException.expect(RuntimeException.class); - expectedException.expectMessage("Not called from a runtime context"); + expectedException.expect(IllegalStateException.class); + expectedException.expectMessage("Value only available at runtime"); + expectedException.expectMessage("foo"); provider.get(); } @@ -108,7 +109,7 @@ public class ValueProviderTest { ValueProvider<String> provider = options.getFoo(); assertEquals("foo", ((RuntimeValueProvider) provider).propertyName()); assertEquals( - "RuntimeValueProvider{propertyName=foo, default=null, value=null}", + "RuntimeValueProvider{propertyName=foo, default=null}", provider.toString()); } @@ -239,9 +240,7 @@ public class ValueProviderTest { }); assertTrue(nvp.isAccessible()); assertEquals("foobar", nvp.get()); - assertEquals( - "NestedValueProvider{value=StaticValueProvider{value=foo}}", - nvp.toString()); + assertEquals("foobar", nvp.toString()); } @Test @@ -266,7 +265,7 @@ public class ValueProviderTest { assertEquals("bar", ((NestedValueProvider) doubleNvp).propertyName()); assertFalse(nvp.isAccessible()); expectedException.expect(RuntimeException.class); - expectedException.expectMessage("Not called from a runtime context"); + expectedException.expectMessage("Value only available at runtime"); nvp.get(); } http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 29828e4..1e0ab30 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -451,8 +451,7 @@ public class BigQueryIO { private BigQuerySourceBase createSource(String jobUuid) { BigQuerySourceBase source; - if (getQuery() == null - || (getQuery().isAccessible() && Strings.isNullOrEmpty(getQuery().get()))) { + if (getQuery() == null) { source = BigQueryTableSource.create(jobUuid, getTableProvider(), getBigQueryServices()); } else { source = @@ -517,26 +516,30 @@ public class BigQueryIO { // Note that a table or query check can fail if the table or dataset are created by // earlier stages of the pipeline or if a query depends on earlier stages of a pipeline. // For these cases the withoutValidation method can be used to disable the check. - if (getValidate() && table != null && table.isAccessible() - && table.get().getProjectId() != null) { - checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); - // Check for source table presence for early failure notification. - DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); - BigQueryHelpers.verifyDatasetPresence(datasetService, table.get()); - BigQueryHelpers.verifyTablePresence(datasetService, table.get()); - } else if (getValidate() && getQuery() != null) { - checkState(getQuery().isAccessible(), "Cannot call validate if query is dynamically set."); - JobService jobService = getBigQueryServices().getJobService(bqOptions); - try { - jobService.dryRunQuery( - bqOptions.getProject(), - new JobConfigurationQuery() - .setQuery(getQuery().get()) - .setFlattenResults(getFlattenResults()) - .setUseLegacySql(getUseLegacySql())); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e); + if (getValidate()) { + if (table != null) { + checkState(table.isAccessible(), "Cannot call validate if table is dynamically set."); + } + if (table != null && table.get().getProjectId() != null) { + // Check for source table presence for early failure notification. + DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions); + BigQueryHelpers.verifyDatasetPresence(datasetService, table.get()); + BigQueryHelpers.verifyTablePresence(datasetService, table.get()); + } else if (getQuery() != null) { + checkState( + getQuery().isAccessible(), "Cannot call validate if query is dynamically set."); + JobService jobService = getBigQueryServices().getJobService(bqOptions); + try { + jobService.dryRunQuery( + bqOptions.getProject(), + new JobConfigurationQuery() + .setQuery(getQuery().get()) + .setFlattenResults(getFlattenResults()) + .setUseLegacySql(getUseLegacySql())); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e); + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java index 1d45641..52b8259 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java @@ -63,7 +63,6 @@ class BigQueryTableSource extends BigQuerySourceBase { @Override protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { - checkState(jsonTable.isAccessible()); TableReference tableReference = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); return setDefaultProjectIfAbsent(bqOptions, tableReference); @@ -94,7 +93,6 @@ class BigQueryTableSource extends BigQuerySourceBase { @Override public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - checkState(jsonTable.isAccessible()); TableReference tableRef = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class); return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef)); http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 46c2df4..e3780b4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -146,17 +146,11 @@ public class PubsubIO { private static void populateCommonDisplayData(DisplayData.Builder builder, String timestampAttribute, String idAttribute, ValueProvider<PubsubTopic> topic) { builder - .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute) - .withLabel("Timestamp Attribute")) - .addIfNotNull(DisplayData.item("idAttribute", idAttribute) - .withLabel("ID Attribute")); - - if (topic != null) { - String topicString = topic.isAccessible() ? topic.get().asPath() - : topic.toString(); - builder.add(DisplayData.item("topic", topicString) - .withLabel("Pubsub Topic")); - } + .addIfNotNull( + DisplayData.item("timestampAttribute", timestampAttribute) + .withLabel("Timestamp Attribute")) + .addIfNotNull(DisplayData.item("idAttribute", idAttribute).withLabel("ID Attribute")) + .addIfNotNull(DisplayData.item("topic", topic).withLabel("Pubsub Topic")); } /** @@ -263,6 +257,11 @@ public class PubsubIO { return subscription; } } + + @Override + public String toString() { + return asPath(); + } } /** @@ -428,6 +427,11 @@ public class PubsubIO { return topic; } } + + @Override + public String toString() { + return asPath(); + } } /** Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. */ @@ -734,13 +738,8 @@ public class PubsubIO { super.populateDisplayData(builder); populateCommonDisplayData( builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider()); - - if (getSubscriptionProvider() != null) { - String subscriptionString = getSubscriptionProvider().isAccessible() - ? getSubscriptionProvider().get().asPath() : getSubscriptionProvider().toString(); - builder.add(DisplayData.item("subscription", subscriptionString) - .withLabel("Pubsub Subscription")); - } + builder.addIfNotNull(DisplayData.item("subscription", getSubscriptionProvider()) + .withLabel("Pubsub Subscription")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index ad38e28..a8f6fa2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -295,11 +295,7 @@ public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - String topicString = - topic == null ? null - : topic.isAccessible() ? topic.get().getPath() - : topic.toString(); - builder.add(DisplayData.item("topic", topicString)); + builder.add(DisplayData.item("topic", topic)); builder.add(DisplayData.item("transport", pubsubFactory.getKind())); builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)); builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute)); http://git-wip-us.apache.org/repos/asf/beam/blob/97810b4b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java index 8da6ff4..bf3a121 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java @@ -1222,21 +1222,12 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub @Override public void populateDisplayData(Builder builder) { super.populateDisplayData(builder); - if (subscription != null) { - String subscriptionString = subscription.isAccessible() - ? subscription.get().getPath() - : subscription.toString(); - builder.add(DisplayData.item("subscription", subscriptionString)); - } - if (topic != null) { - String topicString = topic.isAccessible() - ? topic.get().getPath() - : topic.toString(); - builder.add(DisplayData.item("topic", topicString)); - } - builder.add(DisplayData.item("transport", pubsubFactory.getKind())); - builder.addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)); - builder.addIfNotNull(DisplayData.item("idAttribute", idAttribute)); + builder + .addIfNotNull(DisplayData.item("subscription", subscription)) + .addIfNotNull(DisplayData.item("topic", topic)) + .add(DisplayData.item("transport", pubsubFactory.getKind())) + .addIfNotNull(DisplayData.item("timestampAttribute", timestampAttribute)) + .addIfNotNull(DisplayData.item("idAttribute", idAttribute)); } } @@ -1416,8 +1407,6 @@ public class PubsubUnboundedSource extends PTransform<PBegin, PCollection<Pubsub try (PubsubClient pubsubClient = pubsubFactory.newClient( timestampAttribute, idAttribute, options.as(PubsubOptions.class))) { - checkState(project.isAccessible(), "createRandomSubscription must be called at runtime."); - checkState(topic.isAccessible(), "createRandomSubscription must be called at runtime."); SubscriptionPath subscriptionPath = pubsubClient.createRandomSubscription( project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC);