Coder.structuralValue(T) should never throw In the worst case, encoding to a byte array should never fail due to IO.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8a10ffd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8a10ffd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8a10ffd Branch: refs/heads/master Commit: f8a10ffd7af93705d8011d5287eb2225e540a1fd Parents: 552ddb4 Author: Thomas Groh <tg...@google.com> Authored: Thu Apr 20 20:00:07 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Apr 21 15:00:07 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/dataflow/internal/IsmFormat.java | 2 +- .../src/main/java/org/apache/beam/sdk/coders/Coder.java | 2 +- .../java/org/apache/beam/sdk/coders/DelegateCoder.java | 10 ++++++++-- .../src/main/java/org/apache/beam/sdk/coders/KvCoder.java | 2 +- .../java/org/apache/beam/sdk/coders/NullableCoder.java | 2 +- .../java/org/apache/beam/sdk/coders/StandardCoder.java | 2 +- .../org/apache/beam/sdk/coders/StringDelegateCoder.java | 2 +- .../org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java | 2 +- 8 files changed, 15 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java index 6daddc6..33c27f8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/IsmFormat.java @@ -403,7 +403,7 @@ public class IsmFormat { } @Override - public Object structuralValue(IsmRecord<V> record) throws Exception { + public Object structuralValue(IsmRecord<V> record) { checkNotNull(record); checkState(record.getKeyComponents().size() == keyComponentCoders.size(), "Expected the number of key component coders %s " http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java index 39efaf2..779961e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/Coder.java @@ -198,7 +198,7 @@ public interface Coder<T> extends Serializable { * * <p>See also {@link #consistentWithEquals()}. */ - Object structuralValue(T value) throws Exception; + Object structuralValue(T value); /** * Returns whether {@link #registerByteSizeObserver} cheap enough to http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java index 1762243..7e1154a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/DelegateCoder.java @@ -107,8 +107,14 @@ public final class DelegateCoder<T, IntermediateT> extends CustomCoder<T> { * coder. */ @Override - public Object structuralValue(T value) throws Exception { - return coder.structuralValue(toFn.apply(value)); + public Object structuralValue(T value) { + try { + IntermediateT intermediate = toFn.apply(value); + return coder.structuralValue(intermediate); + } catch (Exception exn) { + throw new IllegalArgumentException( + "Unable to encode element '" + value + "' with coder '" + this + "'.", exn); + } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java index 3c61bf6..fcb906c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/KvCoder.java @@ -114,7 +114,7 @@ public class KvCoder<K, V> extends StandardCoder<KV<K, V>> { } @Override - public Object structuralValue(KV<K, V> kv) throws Exception { + public Object structuralValue(KV<K, V> kv) { if (consistentWithEquals()) { return kv; } else { http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java index d1e1370..c92470a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java @@ -124,7 +124,7 @@ public class NullableCoder<T> extends StandardCoder<T> { } @Override - public Object structuralValue(@Nullable T value) throws Exception { + public Object structuralValue(@Nullable T value) { if (value == null) { return Optional.absent(); } http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java index d41694f..c67fe97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StandardCoder.java @@ -241,7 +241,7 @@ public abstract class StandardCoder<T> implements Coder<T> { } @Override - public Object structuralValue(T value) throws Exception { + public Object structuralValue(T value) { if (value != null && consistentWithEquals()) { return value; } else { http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java index ad7e28c..d4b4ae8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/StringDelegateCoder.java @@ -118,7 +118,7 @@ public final class StringDelegateCoder<T> extends CustomCoder<T> { } @Override - public Object structuralValue(T value) throws Exception { + public Object structuralValue(T value) { return delegateCoder.structuralValue(value); } http://git-wip-us.apache.org/repos/asf/beam/blob/f8a10ffd/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java index 2043a4c..25ef7df 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java @@ -102,7 +102,7 @@ public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> { @SuppressWarnings("unchecked") @Override - public Object structuralValue(KafkaRecord<K, V> value) throws Exception { + public Object structuralValue(KafkaRecord<K, V> value) { if (consistentWithEquals()) { return value; } else {