Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 32d719911 -> 4bf848511


Add support for proto coder


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e1e4b7ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e1e4b7ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e1e4b7ce

Branch: refs/heads/python-sdk
Commit: e1e4b7cef7d0b597cd67fc4039dbf619287b3441
Parents: 32d7199
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Mon Oct 31 12:19:35 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Mon Nov 7 13:16:27 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/coders/coder_impl.py    |  14 +
 sdks/python/apache_beam/coders/coders.py        |  34 ++
 sdks/python/apache_beam/coders/coders_test.py   |  52 +++
 .../apache_beam/coders/coders_test_common.py    |  19 ++
 .../coders/proto2_coder_test_messages_pb2.py    | 318 +++++++++++++++++++
 sdks/python/apache_beam/coders/typecoders.py    |  32 +-
 sdks/python/run_pylint.sh                       |   3 +-
 sdks/python/setup.py                            |   1 +
 8 files changed, 468 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/apache_beam/coders/coder_impl.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index 40fc1fd..be15a7d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -196,6 +196,20 @@ class DeterministicPickleCoderImpl(CoderImpl):
     return self._pickle_coder.decode(encoded)
 
 
+class ProtoCoderImpl(SimpleCoderImpl):
+
+  def __init__(self, proto_message_type):
+    self.proto_message_type = proto_message_type
+
+  def encode(self, value):
+    return value.SerializeToString()
+
+  def decode(self, encoded):
+    proto_message = self.proto_message_type()
+    proto_message.ParseFromString(encoded)
+    return proto_message
+
+
 UNKNOWN_TYPE = 0xFF
 NONE_TYPE = 0
 INT_TYPE = 1

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/apache_beam/coders/coders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index fbbc325..ed4ac92 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -19,6 +19,7 @@
 
 import base64
 import cPickle as pickle
+import google.protobuf
 
 from apache_beam.coders import coder_impl
 
@@ -450,6 +451,39 @@ class Base64PickleCoder(Coder):
     return self
 
 
+class ProtoCoder(FastCoder):
+  """A Coder for Google Protocol Buffers.
+
+  It supports both Protocol Buffers syntax versions 2 and 3. However,
+  the runtime version of the python protobuf library must exactly match the
+  version of the protoc compiler what was used to generate the protobuf
+  messages.
+
+  ProtoCoder is registered in the global CoderRegistry as the default coder for
+  any protobuf Message object.
+
+  """
+
+  def __init__(self, proto_message_type):
+    self.proto_message_type = proto_message_type
+
+  def _create_impl(self):
+    return coder_impl.ProtoCoderImpl(self.proto_message_type)
+
+  def is_deterministic(self):
+    # TODO(vikasrk): A proto message can be deterministic if it does not 
contain
+    # a Map.
+    return False
+
+  @staticmethod
+  def from_type_hint(typehint, unused_registry):
+    if issubclass(typehint, google.protobuf.message.Message):
+      return ProtoCoder(typehint)
+    else:
+      raise ValueError(('Expected a subclass of 
google.protobuf.message.Message'
+                        ', but got a %s' % typehint))
+
+
 class TupleCoder(FastCoder):
   """Coder of tuple objects."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/apache_beam/coders/coders_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test.py 
b/sdks/python/apache_beam/coders/coders_test.py
index 3db5338..ba505db 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -21,6 +21,7 @@ import logging
 import unittest
 
 from apache_beam import coders
+from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 
 
 class PickleCoderTest(unittest.TestCase):
@@ -58,6 +59,57 @@ class CodersTest(unittest.TestCase):
     self.assertEqual('abc', real_coder.decode(real_coder.encode('abc')))
 
 
+# The test proto message file was generated by running the following:
+#
+# `cd <beam repo>`
+# `cp sdks/java/core/src/proto/proto2_coder_test_message.proto
+#    sdks/python/apache_beam/coders`
+# `cd sdks/python`
+# `protoc apache_beam/coders/proto2_coder_test_messages.proto
+#    --python_out=apache_beam/coders
+# `rm apache_beam/coders/proto2_coder_test_message.proto`
+#
+# Note: The protoc version should match the protobuf library version specified
+# in setup.py.
+#
+# TODO(vikasrk): The proto file should be placed in a common directory
+# that can be shared between java and python.
+class ProtoCoderTest(unittest.TestCase):
+
+  def test_proto_coder(self):
+    ma = test_message.MessageA()
+    mb = ma.field2.add()
+    mb.field1 = True
+    ma.field1 = u'hello world'
+    expected_coder = coders.ProtoCoder(ma.__class__)
+    real_coder = coders.registry.get_coder(ma.__class__)
+    self.assertEqual(expected_coder, real_coder)
+    self.assertEqual(real_coder.encode(ma), expected_coder.encode(ma))
+    self.assertEqual(ma, real_coder.decode(real_coder.encode(ma)))
+
+
+class DummyClass(object):
+  """A class with no registered coder."""
+  def __init__(self):
+    pass
+
+  def __eq__(self, other):
+    if isinstance(other, self.__class__):
+      return True
+    return False
+
+
+class FallbackCoderTest(unittest.TestCase):
+
+  def test_default_fallback_path(self):
+    """Test fallback path picks a matching coder if no coder is registered."""
+
+    coder = coders.registry.get_coder(DummyClass)
+    # No matching coder, so picks the last fallback coder which is a
+    # FastPrimitivesCoder.
+    self.assertEqual(coder, coders.FastPrimitivesCoder())
+    self.assertEqual(DummyClass(), coder.decode(coder.encode(DummyClass())))
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/apache_beam/coders/coders_test_common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index e7780e4..40044aa 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -27,6 +27,8 @@ import coders
 import observable
 from apache_beam.utils import timestamp
 
+from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
+
 
 # Defined out of line for picklability.
 class CustomCoder(coders.Coder):
@@ -58,6 +60,7 @@ class CodersTest(unittest.TestCase):
                      coders.FastCoder,
                      coders.Base64PickleCoder,
                      coders.FloatCoder,
+                     coders.ProtoCoder,
                      coders.TimestampCoder,
                      coders.ToStringCoder,
                      coders.WindowCoder,
@@ -204,6 +207,22 @@ class CodersTest(unittest.TestCase):
                            coders.IterableCoder(coders.VarIntCoder()))),
         (1, [1, 2, 3]))
 
+  def test_proto_coder(self):
+    # For instructions on how these test proto message were generated,
+    # see coders_test.py
+    ma = test_message.MessageA()
+    mab = ma.field2.add()
+    mab.field1 = True
+    ma.field1 = u'hello world'
+
+    mb = test_message.MessageA()
+    mb.field1 = u'beam'
+
+    proto_coder = coders.ProtoCoder(ma.__class__)
+    self.check_coder(proto_coder, ma)
+    self.check_coder(coders.TupleCoder((proto_coder, coders.BytesCoder())),
+                     (ma, 'a'), (mb, 'b'))
+
   def test_nested_observables(self):
     class FakeObservableIterator(observable.ObservableMixin):
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py 
b/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py
new file mode 100644
index 0000000..16b1b4d
--- /dev/null
+++ b/sdks/python/apache_beam/coders/proto2_coder_test_messages_pb2.py
@@ -0,0 +1,318 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: sdks/java/core/src/main/proto/proto2_coder_test_messages.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf import descriptor_pb2
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+  name='apache_beam/coders/proto2_coder_test_messages.proto',
+  package='proto2_coder_test_messages',
+  syntax='proto2',
+  
serialized_pb=_b('\n3apache_beam/coders/proto2_coder_test_messages.proto\x12\x1aproto2_coder_test_messages\"P\n\x08MessageA\x12\x0e\n\x06\x66ield1\x18\x01
 \x01(\t\x12\x34\n\x06\x66ield2\x18\x02 
\x03(\x0b\x32$.proto2_coder_test_messages.MessageB\"\x1a\n\x08MessageB\x12\x0e\n\x06\x66ield1\x18\x01
 
\x01(\x08\"\x10\n\x08MessageC*\x04\x08\x64\x10j\"\xad\x01\n\x0eMessageWithMap\x12\x46\n\x06\x66ield1\x18\x01
 
\x03(\x0b\x32\x36.proto2_coder_test_messages.MessageWithMap.Field1Entry\x1aS\n\x0b\x46ield1Entry\x12\x0b\n\x03key\x18\x01
 \x01(\t\x12\x33\n\x05value\x18\x02 
\x01(\x0b\x32$.proto2_coder_test_messages.MessageA:\x02\x38\x01\"V\n\x18ReferencesMessageWithMap\x12:\n\x06\x66ield1\x18\x01
 
\x03(\x0b\x32*.proto2_coder_test_messages.MessageWithMap:Z\n\x06\x66ield1\x12$.proto2_coder_test_messages.MessageC\x18\x65
 
\x01(\x0b\x32$.proto2_coder_test_messages.MessageA:Z\n\x06\x66ield2\x12$.proto2_coder_test_messages.MessageC\x18\x66
 \x01(\x0b\x32$.proto2_coder_test_messages.MessageBB\x1c\n\x1aorg.apa
 che.beam.sdk.coders')
+)
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+
+FIELD1_FIELD_NUMBER = 101
+field1 = _descriptor.FieldDescriptor(
+  name='field1', full_name='proto2_coder_test_messages.field1', index=0,
+  number=101, type=11, cpp_type=10, label=1,
+  has_default_value=False, default_value=None,
+  message_type=None, enum_type=None, containing_type=None,
+  is_extension=True, extension_scope=None,
+  options=None)
+FIELD2_FIELD_NUMBER = 102
+field2 = _descriptor.FieldDescriptor(
+  name='field2', full_name='proto2_coder_test_messages.field2', index=1,
+  number=102, type=11, cpp_type=10, label=1,
+  has_default_value=False, default_value=None,
+  message_type=None, enum_type=None, containing_type=None,
+  is_extension=True, extension_scope=None,
+  options=None)
+
+
+_MESSAGEA = _descriptor.Descriptor(
+  name='MessageA',
+  full_name='proto2_coder_test_messages.MessageA',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='field1', full_name='proto2_coder_test_messages.MessageA.field1', 
index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='field2', full_name='proto2_coder_test_messages.MessageA.field2', 
index=1,
+      number=2, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto2',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=83,
+  serialized_end=163,
+)
+
+
+_MESSAGEB = _descriptor.Descriptor(
+  name='MessageB',
+  full_name='proto2_coder_test_messages.MessageB',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='field1', full_name='proto2_coder_test_messages.MessageB.field1', 
index=0,
+      number=1, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto2',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=165,
+  serialized_end=191,
+)
+
+
+_MESSAGEC = _descriptor.Descriptor(
+  name='MessageC',
+  full_name='proto2_coder_test_messages.MessageC',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=True,
+  syntax='proto2',
+  extension_ranges=[(100, 106), ],
+  oneofs=[
+  ],
+  serialized_start=193,
+  serialized_end=209,
+)
+
+
+_MESSAGEWITHMAP_FIELD1ENTRY = _descriptor.Descriptor(
+  name='Field1Entry',
+  full_name='proto2_coder_test_messages.MessageWithMap.Field1Entry',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='key', 
full_name='proto2_coder_test_messages.MessageWithMap.Field1Entry.key', index=0,
+      number=1, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+    _descriptor.FieldDescriptor(
+      name='value', 
full_name='proto2_coder_test_messages.MessageWithMap.Field1Entry.value', 
index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), 
_b('8\001')),
+  is_extendable=False,
+  syntax='proto2',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=302,
+  serialized_end=385,
+)
+
+_MESSAGEWITHMAP = _descriptor.Descriptor(
+  name='MessageWithMap',
+  full_name='proto2_coder_test_messages.MessageWithMap',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='field1', 
full_name='proto2_coder_test_messages.MessageWithMap.field1', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[_MESSAGEWITHMAP_FIELD1ENTRY, ],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto2',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=212,
+  serialized_end=385,
+)
+
+
+_REFERENCESMESSAGEWITHMAP = _descriptor.Descriptor(
+  name='ReferencesMessageWithMap',
+  full_name='proto2_coder_test_messages.ReferencesMessageWithMap',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='field1', 
full_name='proto2_coder_test_messages.ReferencesMessageWithMap.field1', index=0,
+      number=1, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  options=None,
+  is_extendable=False,
+  syntax='proto2',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=387,
+  serialized_end=473,
+)
+
+_MESSAGEA.fields_by_name['field2'].message_type = _MESSAGEB
+_MESSAGEWITHMAP_FIELD1ENTRY.fields_by_name['value'].message_type = _MESSAGEA
+_MESSAGEWITHMAP_FIELD1ENTRY.containing_type = _MESSAGEWITHMAP
+_MESSAGEWITHMAP.fields_by_name['field1'].message_type = 
_MESSAGEWITHMAP_FIELD1ENTRY
+_REFERENCESMESSAGEWITHMAP.fields_by_name['field1'].message_type = 
_MESSAGEWITHMAP
+DESCRIPTOR.message_types_by_name['MessageA'] = _MESSAGEA
+DESCRIPTOR.message_types_by_name['MessageB'] = _MESSAGEB
+DESCRIPTOR.message_types_by_name['MessageC'] = _MESSAGEC
+DESCRIPTOR.message_types_by_name['MessageWithMap'] = _MESSAGEWITHMAP
+DESCRIPTOR.message_types_by_name['ReferencesMessageWithMap'] = 
_REFERENCESMESSAGEWITHMAP
+DESCRIPTOR.extensions_by_name['field1'] = field1
+DESCRIPTOR.extensions_by_name['field2'] = field2
+
+MessageA = _reflection.GeneratedProtocolMessageType('MessageA', 
(_message.Message,), dict(
+  DESCRIPTOR = _MESSAGEA,
+  __module__ = 'apache_beam.coders.proto2_coder_test_messages_pb2'
+  # @@protoc_insertion_point(class_scope:proto2_coder_test_messages.MessageA)
+  ))
+_sym_db.RegisterMessage(MessageA)
+
+MessageB = _reflection.GeneratedProtocolMessageType('MessageB', 
(_message.Message,), dict(
+  DESCRIPTOR = _MESSAGEB,
+  __module__ = 'apache_beam.coders.proto2_coder_test_messages_pb2'
+  # @@protoc_insertion_point(class_scope:proto2_coder_test_messages.MessageB)
+  ))
+_sym_db.RegisterMessage(MessageB)
+
+MessageC = _reflection.GeneratedProtocolMessageType('MessageC', 
(_message.Message,), dict(
+  DESCRIPTOR = _MESSAGEC,
+  __module__ = 'apache_beam.coders.proto2_coder_test_messages_pb2'
+  # @@protoc_insertion_point(class_scope:proto2_coder_test_messages.MessageC)
+  ))
+_sym_db.RegisterMessage(MessageC)
+
+MessageWithMap = _reflection.GeneratedProtocolMessageType('MessageWithMap', 
(_message.Message,), dict(
+
+  Field1Entry = _reflection.GeneratedProtocolMessageType('Field1Entry', 
(_message.Message,), dict(
+    DESCRIPTOR = _MESSAGEWITHMAP_FIELD1ENTRY,
+    __module__ = 'apache_beam.coders.proto2_coder_test_messages_pb2'
+    # 
@@protoc_insertion_point(class_scope:proto2_coder_test_messages.MessageWithMap.Field1Entry)
+    ))
+  ,
+  DESCRIPTOR = _MESSAGEWITHMAP,
+  __module__ = 'apache_beam.coders.proto2_coder_test_messages_pb2'
+  # 
@@protoc_insertion_point(class_scope:proto2_coder_test_messages.MessageWithMap)
+  ))
+_sym_db.RegisterMessage(MessageWithMap)
+_sym_db.RegisterMessage(MessageWithMap.Field1Entry)
+
+ReferencesMessageWithMap = 
_reflection.GeneratedProtocolMessageType('ReferencesMessageWithMap', 
(_message.Message,), dict(
+  DESCRIPTOR = _REFERENCESMESSAGEWITHMAP,
+  __module__ = 'apache_beam.coders.proto2_coder_test_messages_pb2'
+  # 
@@protoc_insertion_point(class_scope:proto2_coder_test_messages.ReferencesMessageWithMap)
+  ))
+_sym_db.RegisterMessage(ReferencesMessageWithMap)
+
+field1.message_type = _MESSAGEA
+MessageC.RegisterExtension(field1)
+field2.message_type = _MESSAGEB
+MessageC.RegisterExtension(field2)
+
+DESCRIPTOR.has_options = True
+DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), 
_b('\n\032org.apache.beam.sdk.coders'))
+_MESSAGEWITHMAP_FIELD1ENTRY.has_options = True
+_MESSAGEWITHMAP_FIELD1ENTRY._options = 
_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+# @@protoc_insertion_point(module_scope)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/apache_beam/coders/typecoders.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/coders/typecoders.py 
b/sdks/python/apache_beam/coders/typecoders.py
index 74e5770..1a01ccb 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -86,9 +86,10 @@ class CoderRegistry(object):
     self._register_coder_internal(bytes, coders.BytesCoder)
     self._register_coder_internal(unicode, coders.StrUtf8Coder)
     self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
-    self._fallback_coder = fallback_coder or coders.FastPrimitivesCoder
-    self._register_coder_internal(typehints.AnyTypeConstraint,
-                                  self._fallback_coder)
+    # Default fallback coders applied in that order until the first matching
+    # coder found.
+    default_fallback_coders = [coders.ProtoCoder, coders.FastPrimitivesCoder]
+    self._fallback_coder = fallback_coder or FirstOf(default_fallback_coders)
 
   def _register_coder_internal(self, typehint_type, typehint_coder_class):
     self._coders[typehint_type] = typehint_coder_class
@@ -145,7 +146,9 @@ class CoderRegistry(object):
                    'and for custom key classes, by writing a '
                    'deterministic custom Coder. Please see the '
                    'documentation for more details.' % (key_coder, op_name))
-      if isinstance(key_coder, (coders.PickleCoder, self._fallback_coder)):
+      # TODO(vikasrk): Should we include other fallback coders?
+      if isinstance(key_coder, (coders.PickleCoder,
+                                coders.FastPrimitivesCoder)):
         if not silent:
           logging.warning(error_msg)
         return coders.DeterministicPickleCoder(key_coder, op_name)
@@ -154,4 +157,25 @@ class CoderRegistry(object):
     else:
       return key_coder
 
+
+class FirstOf(object):
+  "A class used to get the first matching coder from a list of coders."
+
+  def __init__(self, coders):
+    self._coders = coders
+
+  def from_type_hint(self, typehint, registry):
+    messages = []
+    for coder in self._coders:
+      try:
+        return coder.from_type_hint(typehint, self)
+      except Exception as e:
+        msg = ('%s could not provide a Coder for type %s: %s' %
+               (coder, typehint, e))
+        messages.append(msg)
+
+    raise ValueError('Cannot provide coder for %s: %s' %
+                     (typehint, ';'.join(messages)))
+
+
 registry = CoderRegistry()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/run_pylint.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_pylint.sh b/sdks/python/run_pylint.sh
index b69ea72..e865db6 100755
--- a/sdks/python/run_pylint.sh
+++ b/sdks/python/run_pylint.sh
@@ -42,7 +42,8 @@ EXCLUDED_GENERATED_FILES=(
 "apache_beam/internal/clients/dataflow/dataflow_v1b3_client.py"
 "apache_beam/internal/clients/dataflow/dataflow_v1b3_messages.py"
 "apache_beam/internal/clients/storage/storage_v1_client.py"
-"apache_beam/internal/clients/storage/storage_v1_messages.py")
+"apache_beam/internal/clients/storage/storage_v1_messages.py"
+"apache_beam/coders/proto2_coder_test_messages_pb2.py")
 
 # Get the name of the files that changed compared to the HEAD of the branch.
 # Use diff-filter to exclude deleted files. (i.e. Do not try to lint files that

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1e4b7ce/sdks/python/setup.py
----------------------------------------------------------------------
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 9502f67..1aa3eb3 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -90,6 +90,7 @@ REQUIRED_PACKAGES = [
     'httplib2>=0.8,<0.10',
     'mock>=1.0.1,<3.0.0',
     'oauth2client>=2.0.1,<4.0.0',
+    'protobuf==3.0.0',
     'protorpc>=0.9.1,<0.12',
     'python-gflags>=2.0,<4.0.0',
     'pyyaml>=3.10,<4.0.0',

Reply via email to