Repository: beam Updated Branches: refs/heads/master 34d25f406 -> ff6bb3530
Inline TypedPValue This has exactly one implementation, and this is not expected to change. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef27abdc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef27abdc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef27abdc Branch: refs/heads/master Commit: ef27abdca3010ba12fe0208925535762fde16d7c Parents: 17f0843 Author: Thomas Groh <tg...@google.com> Authored: Wed May 3 13:42:49 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Wed May 3 18:18:57 2017 -0700 ---------------------------------------------------------------------- .../core/construction/ForwardingPTransform.java | 6 +- .../beam/runners/core/SplittableParDo.java | 3 +- .../direct/ParDoMultiOverrideFactory.java | 3 +- .../dataflow/DataflowPipelineTranslator.java | 5 +- .../apache/beam/sdk/transforms/PTransform.java | 6 +- .../org/apache/beam/sdk/transforms/ParDo.java | 3 +- .../org/apache/beam/sdk/values/PCollection.java | 145 ++++++++++++- .../org/apache/beam/sdk/values/PValueBase.java | 2 +- .../org/apache/beam/sdk/values/TypedPValue.java | 208 ------------------- 9 files changed, 150 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java index 2f427ad..ca25ba7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java @@ -22,9 +22,9 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.TypedPValue; /** * A base class for implementing {@link PTransform} overrides, which behave identically to the @@ -51,8 +51,8 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend } @Override - public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unused") - TypedPValue<T> output) throws CannotProvideCoderException { + public <T> Coder<T> getDefaultOutputCoder(InputT input, PCollection<T> output) + throws CannotProvideCoderException { return delegate().getDefaultOutputCoder(input, output); } http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 94f5f35..ed065a6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypedPValue; import org.joda.time.Instant; /** @@ -273,7 +272,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> public <T> Coder<T> getDefaultOutputCoder( PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> input, - TypedPValue<T> output) + PCollection<T> output) throws CannotProvideCoderException { // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder. @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 322c995..b10d669 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.TypedPValue; /** * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo} @@ -184,7 +183,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT> @Override public <T> Coder<T> getDefaultOutputCoder( - PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, TypedPValue<T> output) + PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, PCollection<T> output) throws CannotProvideCoderException { return underlyingParDo.getDefaultOutputCoder(originalInput, output); } http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index 05edd28..69b4ecd 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -97,7 +97,6 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypedPValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -599,8 +598,8 @@ public class DataflowPipelineTranslator { @Override public long addOutput(PValue value) { Coder<?> coder; - if (value instanceof TypedPValue) { - coder = ((TypedPValue<?>) value).getCoder(); + if (value instanceof PCollection) { + coder = ((PCollection<?>) value).getCoder(); if (value instanceof PCollection) { // Wrap the PCollection element Coder inside a WindowedValueCoder. coder = WindowedValue.getFullCoder( http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java index 4f651f2..15abd98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java @@ -29,11 +29,11 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.NameUtils; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TypedPValue; /** * A {@code PTransform<InputT, OutputT>} is an operation that takes an @@ -122,7 +122,7 @@ import org.apache.beam.sdk.values.TypedPValue; * not known at run-time (e.g., due to Java's "erasure" of generic * types) or there was no default Coder registered, then the Coder * should be specified manually by calling - * {@link org.apache.beam.sdk.values.TypedPValue#setCoder} + * {@link PCollection#setCoder} * on the output PCollection. The Coder of every output * PCollection must be determined one way or another * before that output is used as an input to another PTransform, or @@ -306,7 +306,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput> * @throws CannotProvideCoderException if none can be inferred. */ public <T> Coder<T> getDefaultOutputCoder( - InputT input, @SuppressWarnings("unused") TypedPValue<T> output) + InputT input, @SuppressWarnings("unused") PCollection<T> output) throws CannotProvideCoderException { @SuppressWarnings("unchecked") Coder<T> defaultOutputCoder = (Coder<T>) getDefaultOutputCoder(input); http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 6137a7b..73d78c9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.TypedPValue; /** * {@link ParDo} is the core element-wise transform in Apache Beam, invoking a user-specified @@ -763,7 +762,7 @@ public class ParDo { @Override public <T> Coder<T> getDefaultOutputCoder( - PCollection<? extends InputT> input, TypedPValue<T> output) + PCollection<? extends InputT> input, PCollection<T> output) throws CannotProvideCoderException { @SuppressWarnings("unchecked") Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java index 67520ce..034f0de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java @@ -17,14 +17,22 @@ */ package org.apache.beam.sdk.values; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -62,7 +70,119 @@ import org.apache.beam.sdk.util.WindowingStrategy; * * @param <T> the type of the elements of this {@link PCollection} */ -public class PCollection<T> extends TypedPValue<T> { +public class PCollection<T> extends PValueBase implements PValue { + + /** + * The {@link Coder} used by this {@link PCollection} to encode and decode the values stored in + * it, or null if not specified nor inferred yet. + */ + private CoderOrFailure<T> coderOrFailure = + new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur"); + private TypeDescriptor<T> typeDescriptor; + + @Override + public void finishSpecifyingOutput( + PInput input, PTransform<?, ?> transform) { + this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry()); + super.finishSpecifyingOutput(input, transform); + } + + /** + * After building, finalizes this {@link PValue} to make it ready for + * running. Automatically invoked whenever the {@link PValue} is "used" + * (e.g., when apply() is called on it) and when the Pipeline is + * run (useful if this is a {@link PValue} with no consumers). + */ + @Override + public void finishSpecifying(PInput input, PTransform<?, ?> transform) { + if (isFinishedSpecifying()) { + return; + } + this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry()); + // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not, + // this will throw an exception. + getCoder(); + super.finishSpecifying(input, transform); + } + + /** + * Returns a {@link TypeDescriptor TypeDescriptor<T>} with some reflective information + * about {@code T}, if possible. May return {@code null} if no information + * is available. Subclasses may override this to enable better + * {@code Coder} inference. + */ + public TypeDescriptor<T> getTypeDescriptor() { + return typeDescriptor; + } + + /** + * If the coder is not explicitly set, this sets the coder for this {@link PCollection} to the + * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this + * is null, but can and should be improved by subclasses. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private CoderOrFailure<T> inferCoderOrFail( + PInput input, PTransform<?, ?> transform, CoderRegistry registry) { + // First option for a coder: use the Coder set on this PValue. + if (coderOrFailure.coder != null) { + return coderOrFailure; + } + + // Second option for a coder: use the default Coder from the producing PTransform. + CannotProvideCoderException inputCoderException; + try { + return new CoderOrFailure<>( + ((PTransform) transform).getDefaultOutputCoder(input, this), null); + } catch (CannotProvideCoderException exc) { + inputCoderException = exc; + } + + // Third option for a coder: Look in the coder registry. + TypeDescriptor<T> token = getTypeDescriptor(); + CannotProvideCoderException inferFromTokenException = null; + if (token != null) { + try { + return new CoderOrFailure<>(registry.getDefaultCoder(token), null); + } catch (CannotProvideCoderException exc) { + inferFromTokenException = exc; + // Attempt to detect when the token came from a TupleTag used for a ParDo output, + // and provide a better error message if so. Unfortunately, this information is not + // directly available from the TypeDescriptor, so infer based on the type of the PTransform + // and the error message itself. + if (transform instanceof ParDo.MultiOutput + && exc.getReason() == ReasonCode.TYPE_ERASURE) { + inferFromTokenException = new CannotProvideCoderException(exc.getMessage() + + " If this error occurs for an output of the producing ParDo, verify that the " + + "TupleTag for this output is constructed with proper type information (see " + + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible."); + } + } + } + + // Build up the error message and list of causes. + StringBuilder messageBuilder = new StringBuilder() + .append("Unable to return a default Coder for ").append(this) + .append(". Correct one of the following root causes:"); + + // No exception, but give the user a message about .setCoder() has not been called. + messageBuilder.append("\n No Coder has been manually specified; ") + .append(" you may do so using .setCoder()."); + + if (inferFromTokenException != null) { + messageBuilder + .append("\n Inferring a Coder from the CoderRegistry failed: ") + .append(inferFromTokenException.getMessage()); + } + + if (inputCoderException != null) { + messageBuilder + .append("\n Using the default output Coder from the producing PTransform failed: ") + .append(inputCoderException.getMessage()); + } + + // Build and throw the exception. + return new CoderOrFailure<>(null, messageBuilder.toString()); + } /** * The enumeration of cases for whether a {@link PCollection} is bounded. @@ -126,9 +246,9 @@ public class PCollection<T> extends TypedPValue<T> { * @throws IllegalStateException if the {@link Coder} hasn't been set, and * couldn't be inferred. */ - @Override public Coder<T> getCoder() { - return super.getCoder(); + checkState(coderOrFailure.coder != null, coderOrFailure.failure); + return coderOrFailure.coder; } /** @@ -139,9 +259,11 @@ public class PCollection<T> extends TypedPValue<T> { * been finalized and may no longer be set. * Once {@link #apply} has been called, this will be the case. */ - @Override public PCollection<T> setCoder(Coder<T> coder) { - super.setCoder(coder); + checkState( + !isFinishedSpecifying(), "cannot change the Coder of %s once it's been used", this); + checkArgument(coder != null, "Cannot setCoder(null)"); + this.coderOrFailure = new CoderOrFailure<>(coder, null); return this; } @@ -202,9 +324,8 @@ public class PCollection<T> extends TypedPValue<T> { * {@link PCollectionTuple}, {@link PCollectionList}, or {@code PTransform<?, PCollection<T>>}, * etc., to provide more detailed reflective information. */ - @Override public PCollection<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor) { - super.setTypeDescriptor(typeDescriptor); + this.typeDescriptor = typeDescriptor; return this; } @@ -241,4 +362,14 @@ public class PCollection<T> extends TypedPValue<T> { .setWindowingStrategyInternal(windowingStrategy) .setIsBoundedInternal(isBounded); } + + private static class CoderOrFailure<T> { + @Nullable private final Coder<T> coder; + @Nullable private final String failure; + + public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) { + this.coder = coder; + this.failure = failure; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java index 9f151ec..7ab5808 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java @@ -130,7 +130,7 @@ public abstract class PValueBase implements PValue { * * <p>For internal use only. */ - public boolean isFinishedSpecifyingInternal() { + boolean isFinishedSpecifying() { return finishedSpecifying; } http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java deleted file mode 100644 index f473776..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.values; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; - -import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; - -/** - * A {@link TypedPValue TypedPValue<T>} is the abstract base class of things that - * store some number of values of type {@code T}. - * - * <p>Because we know the type {@code T}, this is the layer of the inheritance hierarchy where - * we store a coder for objects of type {@code T}. - * - * @param <T> the type of the values stored in this {@link TypedPValue} - */ -public abstract class TypedPValue<T> extends PValueBase implements PValue { - - /** - * Returns the {@link Coder} used by this {@link TypedPValue} to encode and decode - * the values stored in it. - * - * @throws IllegalStateException if the {@link Coder} hasn't been set, and - * couldn't be inferred. - */ - public Coder<T> getCoder() { - checkState(coderOrFailure.coder != null, coderOrFailure.failure); - return coderOrFailure.coder; - } - - /** - * Sets the {@link Coder} used by this {@link TypedPValue} to encode and decode the - * values stored in it. Returns {@code this}. - * - * @throws IllegalStateException if this {@link TypedPValue} has already - * been finalized and is no longer settable, e.g., by having - * {@code apply()} called on it - */ - public TypedPValue<T> setCoder(Coder<T> coder) { - checkState( - !isFinishedSpecifyingInternal(), "cannot change the Coder of %s once it's been used", this); - checkArgument(coder != null, "Cannot setCoder(null)"); - this.coderOrFailure = new CoderOrFailure<>(coder, null); - return this; - } - - @Override - public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { - this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry()); - } - - /** - * After building, finalizes this {@link PValue} to make it ready for - * running. Automatically invoked whenever the {@link PValue} is "used" - * (e.g., when apply() is called on it) and when the Pipeline is - * run (useful if this is a {@link PValue} with no consumers). - */ - @Override - public void finishSpecifying(PInput input, PTransform<?, ?> transform) { - if (isFinishedSpecifyingInternal()) { - return; - } - this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry()); - // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not, - // this will throw an exception. - getCoder(); - super.finishSpecifying(input, transform); - } - - ///////////////////////////////////////////////////////////////////////////// - // Internal details below here. - - /** - * The {@link Coder} used by this {@link TypedPValue} to encode and decode the values stored in - * it, or null if not specified nor inferred yet. - */ - private CoderOrFailure<T> coderOrFailure = - new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur"); - - protected TypedPValue(Pipeline p) { - super(p); - } - - private TypeDescriptor<T> typeDescriptor; - - /** - * Returns a {@link TypeDescriptor TypeDescriptor<T>} with some reflective information - * about {@code T}, if possible. May return {@code null} if no information - * is available. Subclasses may override this to enable better - * {@code Coder} inference. - */ - public TypeDescriptor<T> getTypeDescriptor() { - return typeDescriptor; - } - - /** - * Sets the {@link TypeDescriptor TypeDescriptor<T>} associated with this class. Better - * reflective type information will lead to better {@link Coder} - * inference. - */ - public TypedPValue<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor) { - this.typeDescriptor = typeDescriptor; - return this; - } - - /** - * If the coder is not explicitly set, this sets the coder for this {@link TypedPValue} to the - * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this - * is null, but can and should be improved by subclasses. - */ - @SuppressWarnings({"unchecked", "rawtypes"}) - private CoderOrFailure<T> inferCoderOrFail( - PInput input, PTransform<?, ?> transform, CoderRegistry registry) { - // First option for a coder: use the Coder set on this PValue. - if (coderOrFailure.coder != null) { - return coderOrFailure; - } - - // Second option for a coder: use the default Coder from the producing PTransform. - CannotProvideCoderException inputCoderException; - try { - return new CoderOrFailure<>( - ((PTransform) transform).getDefaultOutputCoder(input, this), null); - } catch (CannotProvideCoderException exc) { - inputCoderException = exc; - } - - // Third option for a coder: Look in the coder registry. - TypeDescriptor<T> token = getTypeDescriptor(); - CannotProvideCoderException inferFromTokenException = null; - if (token != null) { - try { - return new CoderOrFailure<>(registry.getDefaultCoder(token), null); - } catch (CannotProvideCoderException exc) { - inferFromTokenException = exc; - // Attempt to detect when the token came from a TupleTag used for a ParDo output, - // and provide a better error message if so. Unfortunately, this information is not - // directly available from the TypeDescriptor, so infer based on the type of the PTransform - // and the error message itself. - if (transform instanceof ParDo.MultiOutput - && exc.getReason() == ReasonCode.TYPE_ERASURE) { - inferFromTokenException = new CannotProvideCoderException(exc.getMessage() - + " If this error occurs for an output of the producing ParDo, verify that the " - + "TupleTag for this output is constructed with proper type information (see " - + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible."); - } - } - } - - // Build up the error message and list of causes. - StringBuilder messageBuilder = new StringBuilder() - .append("Unable to return a default Coder for ").append(this) - .append(". Correct one of the following root causes:"); - - // No exception, but give the user a message about .setCoder() has not been called. - messageBuilder.append("\n No Coder has been manually specified; ") - .append(" you may do so using .setCoder()."); - - if (inferFromTokenException != null) { - messageBuilder - .append("\n Inferring a Coder from the CoderRegistry failed: ") - .append(inferFromTokenException.getMessage()); - } - - if (inputCoderException != null) { - messageBuilder - .append("\n Using the default output Coder from the producing PTransform failed: ") - .append(inputCoderException.getMessage()); - } - - // Build and throw the exception. - return new CoderOrFailure<>(null, messageBuilder.toString()); - } - - private static class CoderOrFailure<T> { - @Nullable private final Coder<T> coder; - @Nullable private final String failure; - - public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) { - this.coder = coder; - this.failure = failure; - } - } -}