This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 137ed1f [BEAM-6979] Add support for double well known coder to Java and add tests to Python & Java. new ce020e9 Merge pull request #8205 [BEAM-6979] Double well known coder to Java and tests. 137ed1f is described below commit 137ed1f82726bb002e8f64870b349af94faac699 Author: Luke Cwik <lc...@google.com> AuthorDate: Tue Apr 2 15:11:19 2019 -0700 [BEAM-6979] Add support for double well known coder to Java and add tests to Python & Java. --- .../beam/model/fnexecution/v1/standard_coders.yaml | 16 ++++++++++++++++ .../runners/core/construction/ModelCoderRegistrar.java | 3 +++ .../beam/runners/core/construction/ModelCoders.java | 5 ++++- .../runners/core/construction/CoderTranslationTest.java | 2 ++ .../beam/runners/core/construction/CommonCoderTest.java | 9 +++++++++ sdks/python/apache_beam/coders/standard_coders_test.py | 16 ++++++++++++++-- .../python/apache_beam/testing/data/standard_coders.yaml | 16 ++++++++++++++++ 7 files changed, 64 insertions(+), 3 deletions(-) diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index f7ede2b..2629cb5 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -207,3 +207,19 @@ examples: pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}] } + +--- + +coder: + urn: "beam:coder:double:v1" +examples: + "\0\0\0\0\0\0\0\0": "0" + "\u0080\0\0\0\0\0\0\0": "-0" + "\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "0.1" + "\u00bf\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "-0.1" + "\0\0\0\0\0\0\0\u0001": "4.9e-324" + "\0\u0001\0\0\0\0\0\0": "1.390671161567e-309" + "\u007f\u00ef\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff": "1.7976931348623157e308" + "\u007f\u00f0\0\0\0\0\0\0": "Infinity" + "\u00ff\u00f0\0\0\0\0\0\0": "-Infinity" + "\u007f\u00f8\0\0\0\0\0\0": "NaN" diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java index 8843125..8e5d30c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; @@ -54,6 +55,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { .put(LengthPrefixCoder.class, ModelCoders.LENGTH_PREFIX_CODER_URN) .put(GlobalWindow.Coder.class, ModelCoders.GLOBAL_WINDOW_CODER_URN) .put(FullWindowedValueCoder.class, ModelCoders.WINDOWED_VALUE_CODER_URN) + .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN) .build(); public static final Set<String> WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values(); @@ -70,6 +72,7 @@ public class ModelCoderRegistrar implements CoderTranslatorRegistrar { .put(Timer.Coder.class, CoderTranslators.timer()) .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix()) .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue()) + .put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class)) .build(); static { diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java index 3c6dfba..720bd6c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java @@ -37,6 +37,8 @@ public class ModelCoders { // coders? public static final String INT64_CODER_URN = getUrn(StandardCoders.Enum.VARINT); + public static final String DOUBLE_CODER_URN = getUrn(StandardCoders.Enum.DOUBLE); + public static final String ITERABLE_CODER_URN = getUrn(StandardCoders.Enum.ITERABLE); public static final String TIMER_CODER_URN = getUrn(StandardCoders.Enum.TIMER); public static final String KV_CODER_URN = getUrn(StandardCoders.Enum.KV); @@ -61,7 +63,8 @@ public class ModelCoders { LENGTH_PREFIX_CODER_URN, GLOBAL_WINDOW_CODER_URN, INTERVAL_WINDOW_CODER_URN, - WINDOWED_VALUE_CODER_URN); + WINDOWED_VALUE_CODER_URN, + DOUBLE_CODER_URN); public static Set<String> urns() { return MODEL_CODER_URNS; diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java index f96c977..0ec7588 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; @@ -70,6 +71,7 @@ public class CoderTranslationTest { .add( FullWindowedValueCoder.of( IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of())) + .add(DoubleCoder.of()) .build(); /** diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java index 080e7b7..3e129aa 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.coders.ByteCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarLongCoder; @@ -90,6 +91,7 @@ public class CommonCoderTest { .put(getUrn(StandardCoders.Enum.ITERABLE), IterableCoder.class) .put(getUrn(StandardCoders.Enum.TIMER), Timer.Coder.class) .put(getUrn(StandardCoders.Enum.GLOBAL_WINDOW), GlobalWindow.Coder.class) + .put(getUrn(StandardCoders.Enum.DOUBLE), DoubleCoder.class) .put( getUrn(StandardCoders.Enum.WINDOWED_VALUE), WindowedValue.FullWindowedValueCoder.class) @@ -270,6 +272,8 @@ public class CommonCoderTest { (int) paneInfoMap.get("index"), (int) paneInfoMap.get("on_time_index")); return WindowedValue.of(windowValue, timestamp, windows, paneInfo); + } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) { + return Double.parseDouble((String) value); } else { throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn()); } @@ -298,6 +302,8 @@ public class CommonCoderTest { } else if (s.equals(getUrn(StandardCoders.Enum.WINDOWED_VALUE))) { return WindowedValue.FullWindowedValueCoder.of( components.get(0), (Coder<BoundedWindow>) components.get(1)); + } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) { + return DoubleCoder.of(); } else { throw new IllegalStateException("Unknown coder URN: " + coder.getUrn()); } @@ -357,6 +363,9 @@ public class CommonCoderTest { } else if (s.equals(getUrn(StandardCoders.Enum.WINDOWED_VALUE))) { assertEquals(expectedValue, actualValue); + } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) { + + assertEquals(expectedValue, actualValue); } else { throw new IllegalStateException("Unknown coder URN: " + coder.getUrn()); } diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py index cb9d43b..36f1d89 100644 --- a/sdks/python/apache_beam/coders/standard_coders_test.py +++ b/sdks/python/apache_beam/coders/standard_coders_test.py @@ -65,6 +65,7 @@ class StandardCodersTest(unittest.TestCase): 'beam:coder:windowed_value:v1': lambda v, w: coders.WindowedValueCoder(v, w), 'beam:coder:timer:v1': coders._TimerCoder, + 'beam:coder:double:v1': coders.FloatCoder, } _urn_to_json_value_parser = { @@ -87,6 +88,7 @@ class StandardCodersTest(unittest.TestCase): lambda x, payload_parser: dict( payload=payload_parser(x['payload']), timestamp=Timestamp(micros=x['timestamp'])), + 'beam:coder:double:v1': lambda x: float(x), } def test_standard_coders(self): @@ -95,6 +97,16 @@ class StandardCodersTest(unittest.TestCase): self._run_standard_coder(name, spec) def _run_standard_coder(self, name, spec): + def assert_equal(actual, expected): + """Handle nan values which self.assertEqual fails on.""" + import math + if (isinstance(actual, float) + and isinstance(expected, float) + and math.isnan(actual) + and math.isnan(expected)): + return + self.assertEqual(actual, expected) + coder = self.parse_coder(spec['coder']) parse_value = self.json_value_parser(spec['coder']) nested_list = [spec['nested']] if 'nested' in spec else [True, False] @@ -108,8 +120,8 @@ class StandardCodersTest(unittest.TestCase): self.to_fix[spec['index'], expected_encoded] = actual_encoded else: self.assertEqual(expected_encoded, actual_encoded) - self.assertEqual(decode_nested(coder, expected_encoded, nested), - value) + decoded = decode_nested(coder, expected_encoded, nested) + assert_equal(decoded, value) else: # Only verify decoding for a non-deterministic coder self.assertEqual(decode_nested(coder, expected_encoded, nested), diff --git a/sdks/python/apache_beam/testing/data/standard_coders.yaml b/sdks/python/apache_beam/testing/data/standard_coders.yaml index 494e749..9eb4195 100644 --- a/sdks/python/apache_beam/testing/data/standard_coders.yaml +++ b/sdks/python/apache_beam/testing/data/standard_coders.yaml @@ -193,3 +193,19 @@ examples: pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0}, windows: [{end: 1454293425000, span: 3600000}, {end: -9223372036854410, span: 365}] } + +--- + +coder: + urn: "beam:coder:double:v1" +examples: + "\0\0\0\0\0\0\0\0": "0" + "\u0080\0\0\0\0\0\0\0": "-0" + "\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "0.1" + "\u00bf\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": "-0.1" + "\0\0\0\0\0\0\0\u0001": "4.9e-324" + "\0\u0001\0\0\0\0\0\0": "1.390671161567e-309" + "\u007f\u00ef\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff": "1.7976931348623157e308" + "\u007f\u00f0\0\0\0\0\0\0": "Infinity" + "\u00ff\u00f0\0\0\0\0\0\0": "-Infinity" + "\u007f\u00f8\0\0\0\0\0\0": "NaN"