Author: cutting
Date: Thu Jun 25 18:12:00 2009
New Revision: 788453

URL: http://svn.apache.org/viewvc?rev=788453&view=rev
Log:
AVRO-46.  Optimized RPC handshake protocol for Python.  Contributed by sharad.

Added:
    hadoop/avro/trunk/src/py/avro/genericio.py
    hadoop/avro/trunk/src/py/avro/genericipc.py
    hadoop/avro/trunk/src/py/avro/reflectio.py
    hadoop/avro/trunk/src/py/avro/reflectipc.py
Removed:
    hadoop/avro/trunk/src/py/avro/generic.py
    hadoop/avro/trunk/src/py/avro/reflect.py
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/build.xml
    hadoop/avro/trunk/src/py/avro/   (props changed)
    hadoop/avro/trunk/src/py/avro/io.py
    hadoop/avro/trunk/src/py/avro/ipc.py
    hadoop/avro/trunk/src/py/avro/protocol.py
    hadoop/avro/trunk/src/test/py/interoptests.py
    hadoop/avro/trunk/src/test/py/testio.py
    hadoop/avro/trunk/src/test/py/testioreflect.py
    hadoop/avro/trunk/src/test/py/testipc.py
    hadoop/avro/trunk/src/test/py/testipcreflect.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Jun 25 18:12:00 2009
@@ -15,6 +15,8 @@
     ValueReader an abstract class named Decoder, and add concrete
     implementations named BinaryEncoder and BinaryDecoder. (cutting)
 
+    AVRO-46. Optimized RPC handshake protocol for Python.  (sharad)
+
   NEW FEATURES
 
     AVRO-6. Permit easier implementation of alternate generic data

Modified: hadoop/avro/trunk/build.xml
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/build.xml?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/build.xml (original)
+++ hadoop/avro/trunk/build.xml Thu Jun 25 18:12:00 2009
@@ -263,7 +263,15 @@
     <pathelement location="${basedir}/lib/py"/>
   </path>
 
-  <target name="generate-test-data" depends="compile-test-java">
+  <target name="init-py" depends="init, schemata">
+    <copy todir="${basedir}/src/py/avro">
+      <fileset dir="${build.dir}/src/org/apache/avro/ipc">
+       <include name="**/*.avsc"/>
+      </fileset>
+    </copy>
+  </target>
+
+  <target name="generate-test-data" depends="compile-test-java, init-py">
     <mkdir dir="${test.java.build.dir}/data-files"/>
     <java classname="org.apache.avro.RandomData"
       classpathref="test.java.classpath">
@@ -283,7 +291,7 @@
     </py-run>
   </target>
 
-  <target name="test-py" depends="init" description="Run python unit tests">
+  <target name="test-py" depends="init-py" description="Run python unit tests">
     <taskdef name="py-test" classname="org.pyant.tasks.PythonTestTask">
       <classpath refid="java.classpath" />
     </taskdef>
@@ -339,7 +347,7 @@
     </py-test>
   </target>
 
-  <target name="start-rpc-daemons" depends="compile-test-java"
+  <target name="start-rpc-daemons" depends="compile-test-java, init-py"
     description="Start the daemons for rpc interoperability tests">
     <delete dir="${test.java.build.dir}/server-ports"/>
     <mkdir dir="${test.java.build.dir}/server-ports"/>
@@ -530,6 +538,7 @@
     <delete dir="${build.dir}"/>
     <delete>
       <fileset dir="src" includes="**/*.pyc" />
+      <fileset dir="${basedir}/src/py/avro" includes="**/*.avsc"/>
     </delete>
   </target>
 

Propchange: hadoop/avro/trunk/src/py/avro/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Thu Jun 25 18:12:00 2009
@@ -1 +1,2 @@
 *.pyc
+Handshake*.avsc

Added: hadoop/avro/trunk/src/py/avro/genericio.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/genericio.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/genericio.py (added)
+++ hadoop/avro/trunk/src/py/avro/genericio.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,254 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+"""Generic representation for data. 
+Represent Schema data with generic python types.
+
+Uses the following mapping:
+  * Schema records are implemented as dict.
+  * Schema arrays are implemented as list.
+  * Schema maps are implemented as dict.
+  * Schema strings are implemented as unicode.
+  * Schema bytes are implemented as str.
+  * Schema ints are implemented as int.
+  * Schema longs are implemented as long.
+  * Schema floats are implemented as float.
+  * Schema doubles are implemented as float.
+  * Schema booleans are implemented as bool. 
+"""
+
+import avro.schema as schema
+import avro.io as io
+
+def _validatearray(schm, object):
+  if not isinstance(object, list):
+    return False
+  for elem in object:
+    if not validate(schm.getelementtype(), elem):
+      return False
+  return True
+
+def _validatemap(schm, object):
+  if not isinstance(object, dict):
+    return False
+  for k,v in object.items():
+    if not validate(schm.getvaluetype(), v):
+      return False
+  return True
+
+def _validaterecord(schm, object):
+  if not isinstance(object, dict):
+    return False
+  for field,fieldschema in schm.getfields():
+    if not validate(fieldschema, object.get(field)):
+      return False
+  return True
+
+def _validateunion(schm, object):
+  for elemtype in schm.getelementtypes():
+    if validate(elemtype, object):
+      return True
+  return False
+
+_validatefn = {
+     schema.NULL : lambda schm, object: object is None,
+     schema.BOOLEAN : lambda schm, object: isinstance(object, bool),
+     schema.STRING : lambda schm, object: isinstance(object, unicode),
+     schema.FLOAT : lambda schm, object: isinstance(object, float),
+     schema.DOUBLE : lambda schm, object: isinstance(object, float),
+     schema.BYTES : lambda schm, object: isinstance(object, str),
+     schema.INT : lambda schm, object: ((isinstance(object, long) or 
+                                         isinstance(object, int)) and 
+                              io._INT_MIN_VALUE <= object <= 
io._INT_MAX_VALUE),
+     schema.LONG : lambda schm, object: ((isinstance(object, long) or 
+                                          isinstance(object, int)) and 
+                            io._LONG_MIN_VALUE <= object <= 
io._LONG_MAX_VALUE),
+     schema.ENUM : lambda schm, object:
+                                schm.getenumsymbols().__contains__(object),
+     schema.FIXED : lambda schm, object:
+                                (isinstance(object, str) and 
+                                 len(object) == schm.getsize()),
+     schema.ARRAY : _validatearray,
+     schema.MAP : _validatemap,
+     schema.RECORD : _validaterecord,
+     schema.UNION : _validateunion
+     }
+
+def validate(schm, object):
+  """Returns True if a python datum matches a schema."""
+  fn = _validatefn.get(schm.gettype())
+  if fn is not None:
+    return fn(schm, object)
+  else:
+    return False
+
+class DatumReader(io.DatumReaderBase):
+  """DatumReader for generic python objects."""
+
+  def __init__(self, schm=None):
+    self.setschema(schm)
+    self.__readfn = {
+     schema.BOOLEAN : lambda schm, decoder: decoder.readboolean(),
+     schema.STRING : lambda schm, decoder: decoder.readutf8(),
+     schema.INT : lambda schm, decoder: decoder.readint(),
+     schema.LONG : lambda schm, decoder: decoder.readlong(),
+     schema.FLOAT : lambda schm, decoder: decoder.readfloat(),
+     schema.DOUBLE : lambda schm, decoder: decoder.readdouble(),
+     schema.BYTES : lambda schm, decoder: decoder.readbytes(),
+     schema.FIXED : lambda schm, decoder: 
+                            (decoder.read(schm.getsize())),
+     schema.ARRAY : self.readarray,
+     schema.MAP : self.readmap,
+     schema.RECORD : self.readrecord,
+     schema.ENUM : self.readenum,
+     schema.UNION : self.readunion
+     }
+
+  def setschema(self, schm):
+    self.__schm = schm
+
+  def read(self, decoder):
+    return self.readdata(self.__schm, decoder)
+    
+  def readdata(self, schm, decoder):
+    if schm.gettype() == schema.NULL:
+      return None
+    fn = self.__readfn.get(schm.gettype())
+    if fn is not None:
+      return fn(schm, decoder)
+    else:
+      raise AvroException("Unknown type: "+schema.stringval(schm));
+
+  def readmap(self, schm, decoder):
+    result = dict()
+    size = decoder.readlong()
+    if size != 0:
+      for i in range(0, size):
+        key = decoder.readutf8()
+        result[key] = self.readdata(schm.getvaluetype(), decoder)
+      decoder.readlong()
+    return result
+
+  def readarray(self, schm, decoder):
+    result = list()
+    size = decoder.readlong()
+    if size != 0:
+      for i in range(0, size):
+        result.append(self.readdata(schm.getelementtype(), decoder))
+      decoder.readlong()
+    return result
+
+  def readrecord(self, schm, decoder):
+    result = dict() 
+    for field,fieldschema in schm.getfields():
+      result[field] = self.readdata(fieldschema, decoder)
+    return result
+
+  def readenum(self, schm, decoder):
+    index = decoder.readint()
+    return schm.getenumsymbols()[index]
+
+  def readunion(self, schm, decoder):
+    index = int(decoder.readlong())
+    return self.readdata(schm.getelementtypes()[index], decoder)
+
+class DatumWriter(io.DatumWriterBase):
+  """DatumWriter for generic python objects."""
+
+  def __init__(self, schm=None):
+    self.setschema(schm)
+    self.__writefn = {
+     schema.BOOLEAN : lambda schm, datum, encoder: 
+                  encoder.writeboolean(datum),
+     schema.STRING : lambda schm, datum, encoder: 
+                  encoder.writeutf8(datum),
+     schema.INT : lambda schm, datum, encoder: 
+                  encoder.writeint(datum),
+     schema.LONG : lambda schm, datum, encoder: 
+                  encoder.writelong(datum),
+     schema.FLOAT : lambda schm, datum, encoder: 
+                  encoder.writefloat(datum),
+     schema.DOUBLE : lambda schm, datum, encoder: 
+                  encoder.writedouble(datum),
+     schema.BYTES : lambda schm, datum, encoder: 
+                  encoder.writebytes(datum),
+     schema.FIXED : lambda schm, datum, encoder: 
+                  encoder.write(datum),
+     schema.ARRAY : self.writearray,
+     schema.MAP : self.writemap,
+     schema.RECORD : self.writerecord,
+     schema.ENUM : self.writeenum,
+     schema.UNION : self.writeunion
+     }
+
+  def setschema(self, schm):
+    self.__schm = schm
+
+  def write(self, datum, encoder):
+    self.writedata(self.__schm, datum, encoder)
+
+  def writedata(self, schm, datum, encoder):
+    if schm.gettype() == schema.NULL:
+      if datum is None:
+        return
+      raise io.AvroTypeException(schm, datum)
+    fn = self.__writefn.get(schm.gettype())
+    if fn is not None:
+      fn(schm, datum, encoder)
+    else:
+      raise io.AvroTypeException(schm, datum)
+
+  def writemap(self, schm, datum, encoder):
+    if not isinstance(datum, dict):
+      raise io.AvroTypeException(schm, datum)
+    if len(datum) > 0:
+      encoder.writelong(len(datum))
+      for k,v in datum.items():
+        encoder.writeutf8(k)
+        self.writedata(schm.getvaluetype(), v, encoder)
+    encoder.writelong(0)
+
+  def writearray(self, schm, datum, encoder):
+    if not isinstance(datum, list):
+      raise io.AvroTypeException(schm, datum)
+    if len(datum) > 0:
+      encoder.writelong(len(datum))
+      for item in datum:
+        self.writedata(schm.getelementtype(), item, encoder)
+    encoder.writelong(0)
+
+  def writerecord(self, schm, datum, encoder):
+    if not isinstance(datum, dict):
+      raise io.AvroTypeException(schm, datum)
+    for field,fieldschema in schm.getfields():
+      self.writedata(fieldschema, datum.get(field), encoder)
+
+  def writeunion(self, schm, datum, encoder):
+    index = self.resolveunion(schm, datum)
+    encoder.writelong(index)
+    self.writedata(schm.getelementtypes()[index], datum, encoder)
+
+  def writeenum(self, schm, datum, encoder):
+    index = schm.getenumordinal(datum)
+    encoder.writeint(index)
+
+  def resolveunion(self, schm, datum):
+    index = 0
+    for elemtype in schm.getelementtypes():
+      if validate(elemtype, datum):
+        return index
+      index+=1
+    raise io.AvroTypeException(schm, datum)
\ No newline at end of file

Added: hadoop/avro/trunk/src/py/avro/genericipc.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/genericipc.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/genericipc.py (added)
+++ hadoop/avro/trunk/src/py/avro/genericipc.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,57 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+""" Uses genericio to write and read data objects."""
+
+import avro.schema as schema
+import avro.genericio as genericio
+import avro.ipc as ipc
+
+class Requestor(ipc.RequestorBase):
+  """Requestor implementation for generic python data."""
+
+  def getdatumwriter(self, schm):
+    return genericio.DatumWriter(schm)
+
+  def getdatumreader(self, schm):
+    return genericio.DatumReader(schm)
+
+  def writerequest(self, schm, req, encoder):
+    self.getdatumwriter(schm).write(req, encoder)
+
+  def readresponse(self, schm, decoder):
+    return self.getdatumreader(schm).read(decoder)
+
+  def readerror(self, schm, decoder):
+    return ipc.AvroRemoteException(self.getdatumreader(schm).read(decoder))
+
+class Responder(ipc.ResponderBase):
+  """Responder implementation for generic python data."""
+
+  def getdatumwriter(self, schm):
+    return genericio.DatumWriter(schm)
+
+  def getdatumreader(self, schm):
+    return genericio.DatumReader(schm)
+
+  def readrequest(self, schm, decoder):
+    return self.getdatumreader(schm).read(decoder)
+
+  def writeresponse(self, schm, response, encoder):
+    self.getdatumwriter(schm).write(response, encoder)
+
+  def writeerror(self, schm, error, encoder):
+    self.getdatumwriter(schm).write(error.getvalue(), encoder)

Modified: hadoop/avro/trunk/src/py/avro/io.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/io.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/io.py (original)
+++ hadoop/avro/trunk/src/py/avro/io.py Thu Jun 25 18:12:00 2009
@@ -42,7 +42,7 @@
   def setschema(self, schema):
     pass
 
-  def read(self, valuereader):
+  def read(self, decoder):
     """Read a datum. Traverse the schema, depth-first, reading all leaf values
     in the schema into a datum that is returned"""
     pass
@@ -52,13 +52,13 @@
   def setschema(self, schema):
     pass
 
-  def write(self, data, valuewriter):
+  def write(self, data, encoder):
     """Write a datum. Traverse the schema, depth first, writing each leaf value
     in the schema from the datum to the output."""
     pass
 
 
-class ValueReader(object):
+class Decoder(object):
   """Read leaf values."""
 
   def __init__(self, reader):
@@ -108,7 +108,7 @@
   def read(self, len):
     return struct.unpack(len.__str__()+'s', self.__reader.read(len))[0]
 
-class ValueWriter(object):
+class Encoder(object):
   """Write leaf values."""
 
   def __init__(self, writer):
@@ -189,13 +189,13 @@
 
   def __init__(self, schm, writer, dwriter):
     self.__writer = writer
-    self.__vwriter = ValueWriter(writer)
+    self.__encoder = Encoder(writer)
     self.__dwriter = dwriter
     self.__dwriter.setschema(schm)
     self.__count = 0  #entries in file
     self.__blockcount = 0  #entries in current block
     self.__buffer = cStringIO.StringIO()
-    self.__bufwriter = ValueWriter(self.__buffer)
+    self.__bufwriter = Encoder(self.__buffer)
     self.__meta = dict()
     self.__sync = uuid.uuid4().bytes
     self.__meta["sync"] = self.__sync
@@ -218,7 +218,7 @@
   def __writeblock(self):
     if self.__blockcount > 0:
       self.__writer.write(self.__sync)
-      self.__vwriter.writelong(self.__blockcount)
+      self.__encoder.writelong(self.__blockcount)
       self.__writer.write(self.__buffer.getvalue())
       self.__buffer.truncate(0) #reset
       self.__blockcount = 0
@@ -250,8 +250,8 @@
       self.__bufwriter.writebytes(str(v))
     size = self.__buffer.tell() + 4
     self.__writer.write(self.__sync)
-    self.__vwriter.writelong(_FOOTER_BLOCK)
-    self.__vwriter.writelong(size)
+    self.__encoder.writelong(_FOOTER_BLOCK)
+    self.__encoder.writelong(size)
     self.__buffer.flush()
     self.__writer.write(self.__buffer.getvalue())
     self.__buffer.truncate(0) #reset
@@ -265,7 +265,7 @@
 
   def __init__(self, reader, dreader):
     self.__reader = reader
-    self.__vreader = ValueReader(reader)
+    self.__decoder = Decoder(reader)
     mag = struct.unpack(len(_MAGIC).__str__()+'s', 
                  self.__reader.read(len(_MAGIC)))[0]
     if mag != _MAGIC:
@@ -279,11 +279,11 @@
             int(ord(self.__reader.read(1)) << 8) +
             int(ord(self.__reader.read(1))))
     seekpos = self.__reader.seek(self.__length-footersize)
-    metalength = self.__vreader.readlong()
+    metalength = self.__decoder.readlong()
     self.__meta = dict()
     for i in range(0, metalength):
-      key = self.__vreader.readutf8()
-      self.__meta[key] = self.__vreader.readbytes()
+      key = self.__decoder.readutf8()
+      self.__meta[key] = self.__decoder.readbytes()
     self.__sync = self.__meta.get("sync")
     self.__count = int(self.__meta.get("count"))
     self.__schema = schema.parse(self.__meta.get("schema").encode("utf-8"))
@@ -302,12 +302,12 @@
       if self.__reader.tell() == self.__length:
         return None
       self.__skipsync()
-      self.__blockcount = self.__vreader.readlong()
+      self.__blockcount = self.__decoder.readlong()
       if self.__blockcount == _FOOTER_BLOCK:
-        self.__reader.seek(self.__vreader.readlong()+self.__reader.tell())
+        self.__reader.seek(self.__decoder.readlong()+self.__reader.tell())
         self.__blockcount = 0
     self.__blockcount-=1
-    datum = self.__dreader.read(self.__vreader)
+    datum = self.__dreader.read(self.__decoder)
     return datum
 
   def __skipsync(self):

Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Thu Jun 25 18:12:00 2009
@@ -16,14 +16,18 @@
 
 """Support for inter-process calls."""
 
-import socket, struct, errno, struct, cStringIO, threading
+import socket, struct, errno, struct, cStringIO, threading, weakref, os
 import avro.schema as schema
 import avro.protocol as protocol
 import avro.io as io
+import avro.reflectio as reflectio
 
 class TransceiverBase(object):
   """Base class for transmitters and receivers of raw binary messages."""
 
+  def getremotename(self):
+    pass
+
   def transceive(self, request):
     self.writebuffers(request)
     return self.readbuffers()
@@ -51,13 +55,39 @@
 class ConnectionClosedException(Exception):
   pass
 
+_PKGNAME = "org.apache.avro.ipc."
+_HANDSHAKE_FILE_DIR = os.path.dirname(__file__).__str__() + os.path.sep
+_HANDSHAKE_REQUEST_SCHEMA = schema.parse(
+                                  open(_HANDSHAKE_FILE_DIR +
+                                       "HandshakeRequest.avsc").read())
+_HANDSHAKE_RESPONSE_SCHEMA = schema.parse(
+                                  open(_HANDSHAKE_FILE_DIR +
+                                       "HandshakeResponse.avsc").read())
+_HANDSHAKE_REQUESTOR_WRITER = reflectio.ReflectDatumWriter(_PKGNAME, 
+                                                    _HANDSHAKE_REQUEST_SCHEMA)
+_HANDSHAKE_REQUESTOR_READER = reflectio.ReflectDatumReader(_PKGNAME, 
+                                                    _HANDSHAKE_RESPONSE_SCHEMA)
+_HANDSHAKE_RESPONDER_WRITER = reflectio.ReflectDatumWriter(_PKGNAME, 
+                                                    _HANDSHAKE_RESPONSE_SCHEMA)
+_HANDSHAKE_RESPONDER_READER = reflectio.ReflectDatumReader(_PKGNAME, 
+                                                    _HANDSHAKE_REQUEST_SCHEMA)
+_HandshakeRequest = reflectio.gettype(_HANDSHAKE_REQUEST_SCHEMA, _PKGNAME)
+_HandshakeResponse = reflectio.gettype(_HANDSHAKE_RESPONSE_SCHEMA, _PKGNAME)
+_HANDSHAKE_MATCH_BOTH = "BOTH"
+_HANDSHAKE_MATCH_CLIENT = "CLIENT"
+_HANDSHAKE_MATCH_NONE = "NONE"
+_REMOTE_HASHES = dict()
+_REMOTE_PROTOCOLS = dict()
+
 class RequestorBase(object):
   """Base class for the client side of a protocol interaction."""
 
   def __init__(self, localproto, transceiver):
     self.__localproto = localproto
     self.__transceiver = transceiver
-    self.__remoteproto = self.handshake()
+    self.__established = False
+    self.__sendlocaltext = False
+    self.__remoteproto = None
 
   def getlocal(self):
     return self.__localproto
@@ -68,46 +98,80 @@
   def gettransceiver(self):
     return self.__transceiver
 
-  def handshake(self):
-    buf = cStringIO.StringIO()
-    vwriter = io.ValueWriter(buf)
-    vwriter.writelong(protocol.VERSION)
-    vwriter.writeutf8(unicode(self.__localproto.__str__(), 'utf8'))
-    response = self.__transceiver.transceive(buf.getvalue())
-    vreader = io.ValueReader(cStringIO.StringIO(response))
-    remote = vreader.readutf8()
-    return protocol.parse(remote)
-
-  def call(self, msgname, req):
+  def request(self, msgname, req):
     """Writes a request message and reads a response or error message."""
-    m = self.__localproto.getmessages().get(msgname)
+    processed = False
+    while not self.__established or not processed:
+      processed = True
+      buf = cStringIO.StringIO()
+      encoder = io.Encoder(buf)
+      if not self.__established:
+        self.__writehandshake(encoder)
+      m = self.__localproto.getmessages().get(msgname)
+      if m is None:
+        raise schema.AvroException("Not a local message: "+msgname.__str__())
+      encoder.writeutf8(m.getname())
+      self.writerequest(m.getrequest(), req, encoder)
+      response = self.__transceiver.transceive(buf.getvalue())
+      decoder = io.Decoder(cStringIO.StringIO(response))
+      if not self.__established:
+        self.__readhandshake(decoder)
+    m = self.getremote().getmessages().get(msgname)
     if m is None:
-      raise schema.AvroException("Not a local message: "+msgname.__str__())
-    remotemsg = self.__remoteproto.getmessages().get(msgname)
-    if remotemsg is None:
       raise schema.AvroException("Not a remote message: "+msgname.__str__())
-    writer = cStringIO.StringIO()
-    vwriter = io.ValueWriter(writer)
-    vwriter.writeutf8(m.getname())
-    
-    self.writerequest(m.getrequest(), req, vwriter)
-    response = self.__transceiver.transceive(writer.getvalue())
-    vreader = io.ValueReader(cStringIO.StringIO(response))
-    self.__remoteproto.getmessages().get(msgname)
-    if not vreader.readboolean():
-      return self.readresponse(remotemsg.getresponse(), vreader)
+    if not decoder.readboolean():
+      return self.readresponse(m.getresponse(), decoder)
+    else:
+      raise self.readerror(m.geterrors(), decoder)
+
+  def __writehandshake(self, encoder):
+    localhash = self.__localproto.getMD5()
+    remotename = self.__transceiver.getremotename()
+    remotehash = _REMOTE_HASHES.get(remotename)
+    self.__remoteproto = _REMOTE_PROTOCOLS.get(remotehash)
+    if remotehash is None:
+      remotehash = localhash
+      self.__remoteproto = self.__localproto
+    handshake = _HandshakeRequest()
+    handshake.clientHash = localhash
+    handshake.serverHash = remotehash
+    if self.__sendlocaltext:
+      handshake.clientProtocol = unicode(self.__localproto.__str__(), 'utf8')
+    _HANDSHAKE_REQUESTOR_WRITER.write(handshake, encoder)
+
+  def __readhandshake(self, decoder):
+    handshake = _HANDSHAKE_REQUESTOR_READER.read(decoder)
+    print ("Handshake.match of protocol:" + 
+           self.__localproto.getname().__str__()+" with:"+ 
+           self.__transceiver.getremotename().__str__() + " is " +
+           handshake.match.__str__())
+    if handshake.match == _HANDSHAKE_MATCH_BOTH:
+      self.__established = True
+    elif handshake.match == _HANDSHAKE_MATCH_CLIENT:
+      self.__setremote(handshake)
+      self.__established = True
+    elif handshake.match == _HANDSHAKE_MATCH_NONE:
+      self.__setremote(handshake)
+      self.__sendlocaltext = True
     else:
-      raise self.readerror(remotemsg.geterrors(), vreader)
+      raise schema.AvroException("Unexpected match: 
"+handshake.match.__str__())
+
+  def __setremote(self, handshake):
+    self.__remoteproto = protocol.parse(handshake.serverProtocol.__str__())
+    remotehash = handshake.serverHash
+    _REMOTE_HASHES[self.__transceiver.getremotename()] = remotehash
+    if not _REMOTE_PROTOCOLS.has_key(remotehash):
+      _REMOTE_PROTOCOLS[remotehash] = self.__remoteproto
 
-  def writerequest(self, schm, req, vwriter):
+  def writerequest(self, schm, req, encoder):
     """Writes a request message."""
     pass
 
-  def readresponse(self, schm, vreader):
+  def readresponse(self, schm, decoder):
     """Reads a response message."""
     pass
 
-  def readerror(self, schm, vreader):
+  def readerror(self, schm, decoder):
     """Reads an error message."""
     pass
 
@@ -116,74 +180,102 @@
 
   def __init__(self, localproto):
     self.__localproto = localproto
+    self.__remotes = weakref.WeakKeyDictionary()
+    self.__protocols = dict()
+    self.__localhash = self.__localproto.getMD5()
+    self.__protocols[self.__localhash] = self.__localproto
 
   def getlocal(self):
     return self.__localproto
 
-  def handshake(self, server):
-    """Returns the remote protocol."""
-    vreader = io.ValueReader(cStringIO.StringIO(server.readbuffers()))
-    out = cStringIO.StringIO()
-    vwriter = io.ValueWriter(out)
-    version = vreader.readlong()
-    if version != protocol.VERSION:
-      raise schema.AvroException("Incompatible request version: "
-                                  +version.__str__())
-    proto = vreader.readutf8()
-    remote = protocol.parse(proto)
-    vwriter.writeutf8(unicode(self.__localproto.__str__(), 'utf8'))
-    server.writebuffers(out.getvalue())
-    
-    return remote
-
-  def call(self, remoteproto, input):
+  def respond(self, transceiver):
+    """Called by a server to deserialize a request, compute and serialize
+   * a response or error."""
+    transreq = transceiver.readbuffers()
+    reader = cStringIO.StringIO(transreq)
+    decoder = io.Decoder(reader)
     buf = cStringIO.StringIO()
-    vwriter = io.ValueWriter(buf)
+    encoder = io.Encoder(buf)
+    error = None
+    
     try:
-      reader = cStringIO.StringIO(input)
-      vreader = io.ValueReader(reader)
-      msgname = vreader.readutf8()
+      remoteproto = self.__handshake(transceiver, decoder, encoder)
+      if remoteproto is None:  #handshake failed
+        return buf.getvalue()
+      
+      #read request using remote protocol specification
+      msgname = decoder.readutf8()
       m = remoteproto.getmessages().get(msgname)
       if m is None:
         raise schema.AvroException("No such remote message: 
"+msgname.__str__())
+      req = self.readrequest(m.getrequest(), decoder)
       
-      req = self.readrequest(m.getrequest(), vreader)
+      #read response using local protocol specification
       m = self.__localproto.getmessages().get(msgname)
       if m is None:
         raise schema.AvroException("No such local message: "+msgname.__str__())
-      error = None
       try:
         response = self.invoke(m, req)
       except AvroRemoteException, e:
         error = e
       except Exception, e:
         error = AvroRemoteException(unicode(e.__str__()))
-      vwriter.writeboolean(error is not None)
+      encoder.writeboolean(error is not None)
       if error is None:
-        self.writeresponse(m.getresponse(), response, vwriter)
+        self.writeresponse(m.getresponse(), response, encoder)
       else:
-        self.writeerror(m.geterrors(), error, vwriter)
+        self.writeerror(m.geterrors(), error, encoder)
     except schema.AvroException, e:
       error = AvroRemoteException(unicode(e.__str__()))
       buf = cStringIO.StringIO()
-      vwriter = io.ValueWriter(buf)
-      vwriter.writeboolean(True)
-      self.writeerror(protocol._SYSTEM_ERRORS, error, vwriter)
-    
+      encoder = io.Encoder(buf)
+      encoder.writeboolean(True)
+      self.writeerror(protocol._SYSTEM_ERRORS, error, encoder)
+      
     return buf.getvalue()
 
+
+  def __handshake(self, transceiver, decoder, encoder):
+    remoteproto = self.__remotes.get(transceiver)
+    if remoteproto != None:
+      return remoteproto #already established
+    request = _HANDSHAKE_RESPONDER_READER.read(decoder)
+    remoteproto = self.__protocols.get(request.clientHash)
+    if remoteproto is None and request.clientProtocol is not None:
+      remoteproto = protocol.parse(request.clientProtocol)
+      self.__protocols[request.clientHash] = remoteproto
+    if remoteproto is not None:
+      self.__remotes[transceiver] = remoteproto
+    response = _HandshakeResponse()
+    
+    if self.__localhash == request.serverHash:
+      if remoteproto is None:
+        response.match = _HANDSHAKE_MATCH_NONE
+      else:
+        response.match = _HANDSHAKE_MATCH_BOTH
+    else:
+      if remoteproto is None:
+        response.match = _HANDSHAKE_MATCH_NONE
+      else:
+        response.match = _HANDSHAKE_MATCH_CLIENT
+    if response.match != _HANDSHAKE_MATCH_BOTH:
+      response.serverProtocol = unicode(self.__localproto.__str__(), "utf8")
+      response.serverHash = self.__localhash
+    _HANDSHAKE_RESPONDER_WRITER.write(response, encoder)
+    return remoteproto
+
   def invoke(self, msg, req):
     pass
 
-  def readrequest(self, schm, vreader):
+  def readrequest(self, schm, decoder):
     """Reads a request message."""
     pass
 
-  def writeresponse(self, schm, response, vwriter):
+  def writeresponse(self, schm, response, encoder):
     """Writes a response message."""
     pass
 
-  def writeerror(self, schm, error, vwriter):
+  def writeerror(self, schm, error, encoder):
     """Writes an error message."""
     pass
 
@@ -195,6 +287,9 @@
   def __init__(self, sock):
     self.__sock = sock
 
+  def getremotename(self):
+    return self.__sock.getsockname()
+
   def readbuffers(self):
     msg = cStringIO.StringIO()
     while True:
@@ -293,13 +388,13 @@
 
     def __run(self):
       try:
-        remoteproto = self.__responder.handshake(self)
-        while True:
-          buf = self.readbuffers()
-          buf = self.__responder.call(remoteproto, buf)
-          self.writebuffers(buf)
-      except ConnectionClosedException, e:
-        print "Closed:", self.__thread.getName()
-        return
-      finally:
-        self.close()
+        try:
+          while True:
+            self.writebuffers(self.__responder.respond(self))
+        except ConnectionClosedException, e:
+          print "Closed:", self.__thread.getName()
+          return
+        finally:
+          self.close()
+      except Exception, ex:
+        print "Unexpected error", ex

Modified: hadoop/avro/trunk/src/py/avro/protocol.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/protocol.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/protocol.py (original)
+++ hadoop/avro/trunk/src/py/avro/protocol.py Thu Jun 25 18:12:00 2009
@@ -14,7 +14,7 @@
 #See the License for the specific language governing permissions and
 #limitations under the License.
 
-import cStringIO
+import cStringIO, md5
 import simplejson
 import avro.schema as schema
 
@@ -31,6 +31,7 @@
     self.__messages = dict()
     self.__name = name
     self.__namespace = namespace
+    self.__md5 = None
 
   def getname(self):
     return self.__name
@@ -44,6 +45,12 @@
   def getmessages(self):
     return self.__messages
 
+  def getMD5(self):
+    """Return the MD5 hash of the text of this protocol."""
+    if self.__md5 is None:
+      self.__md5 = md5.new(self.__str__()).digest()
+    return self.__md5
+
   class Message(object):
     """ A Protocol message."""
     def __init__(self, proto, name, request, response, errors):

Added: hadoop/avro/trunk/src/py/avro/reflectio.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/reflectio.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/reflectio.py (added)
+++ hadoop/avro/trunk/src/py/avro/reflectio.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,133 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+"""Define Record schema and protocol classes at runtime. This can be used 
+to invoke the methods on Protocol proxy directly."""
+
+import avro.schema as schema
+import avro.io as io
+import avro.genericio as genericio
+
+#TODO pkgname should not be passed, instead classes should be constructed 
+#based on schema namespace
+
+def _validatearray(schm, pkgname, object):
+  if not isinstance(object, list):
+    return False
+  for elem in object:
+    if not validate(schm.getelementtype(), pkgname, elem):
+      return False
+  return True
+
+def _validatemap(schm, pkgname, object):
+  if not isinstance(object, dict):
+    return False
+  for k,v in object.items():
+    if not validate(schm.getvaluetype(), pkgname, v):
+      return False
+  return True
+
+def _validaterecord(schm, pkgname, object):
+  if not isinstance(object, gettype(schm, pkgname)):
+    return False
+  for field,fieldschema in schm.getfields():
+    data = object.__getattribute__(field)
+    if not validate(fieldschema, pkgname, data):
+      return False
+  return True
+
+def _validateunion(schm, pkgname, object):
+  for elemtype in schm.getelementtypes():
+    if validate(elemtype, pkgname, object):
+      return True
+  return False
+
+_validatefn = {
+     schema.NULL : lambda schm, pkgname, object: object is None,
+     schema.BOOLEAN : lambda schm, pkgname, object: isinstance(object, bool),
+     schema.STRING : lambda schm, pkgname, object: isinstance(object, unicode),
+     schema.FLOAT : lambda schm, pkgname, object: isinstance(object, float),
+     schema.DOUBLE : lambda schm, pkgname, object: isinstance(object, float),
+     schema.BYTES : lambda schm, pkgname, object: isinstance(object, str),
+     schema.INT : lambda schm, pkgname, object: ((isinstance(object, long) or 
+                                         isinstance(object, int)) and 
+                              io._INT_MIN_VALUE <= object <= 
io._INT_MAX_VALUE),
+     schema.LONG : lambda schm, pkgname, object: ((isinstance(object, long) or 
+                                          isinstance(object, int)) and 
+                            io._LONG_MIN_VALUE <= object <= 
io._LONG_MAX_VALUE),
+     schema.ENUM : lambda schm, pkgname, object:
+                                schm.getenumsymbols().__contains__(object),
+     schema.FIXED : lambda schm, pkgname, object:
+                                (isinstance(object, str) and 
+                                 len(object) == schm.getsize()),
+     schema.ARRAY : _validatearray,
+     schema.MAP : _validatemap,
+     schema.RECORD : _validaterecord,
+     schema.UNION : _validateunion
+     }
+
+def validate(schm, pkgname, object):
+  """Returns True if a python datum matches a schema."""
+  fn = _validatefn.get(schm.gettype())
+  if fn is not None:
+    return fn(schm, pkgname, object)
+  else:
+    return False
+
+def gettype(recordschm, pkgname, base=object):
+  """Returns the type with classname as recordschm name and pkg name as 
pkgname. 
+  If type does not exist creates a new type."""
+  clazzname = pkgname + recordschm.getname()
+  clazz = globals().get(clazzname)
+  if clazz is None:
+    clazz = type(str(clazzname),(base,),{})
+    for field,fieldschema in recordschm.getfields():
+      setattr(clazz, field, None)
+    globals()[clazzname] = clazz
+  return clazz
+
+class ReflectDatumReader(genericio.DatumReader):
+  """DatumReader for arbitrary python classes."""
+
+  def __init__(self, pkgname, schm=None):
+    genericio.DatumReader.__init__(self, schm)
+    self.__pkgname = pkgname
+
+  def readrecord(self, schm, decoder):
+    type = gettype(schm, self.__pkgname)
+    result = type()
+    for field,fieldschema in schm.getfields():
+      setattr(result, field, self.readdata(fieldschema, decoder))
+    return result
+
+class ReflectDatumWriter(genericio.DatumWriter):
+  """DatumWriter for arbitrary python classes."""
+
+  def __init__(self, pkgname, schm=None):
+    genericio.DatumWriter.__init__(self, schm)
+    self.__pkgname = pkgname
+
+  def writerecord(self, schm, datum, encoder):
+    for field,fieldschema in schm.getfields():
+      self.writedata(fieldschema, getattr(datum, field), encoder)
+
+  def resolveunion(self, schm, datum):
+    index = 0
+    for elemtype in schm.getelementtypes():
+      if validate(elemtype, self.__pkgname, datum):
+        return index
+      index+=1
+    raise io.AvroTypeException(schm, datum)

Added: hadoop/avro/trunk/src/py/avro/reflectipc.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/reflectipc.py?rev=788453&view=auto
==============================================================================
--- hadoop/avro/trunk/src/py/avro/reflectipc.py (added)
+++ hadoop/avro/trunk/src/py/avro/reflectipc.py Thu Jun 25 18:12:00 2009
@@ -0,0 +1,107 @@
+#Licensed to the Apache Software Foundation (ASF) under one
+#or more contributor license agreements.  See the NOTICE file
+#distributed with this work for additional information
+#regarding copyright ownership.  The ASF licenses this file
+#to you under the Apache License, Version 2.0 (the
+#"License"); you may not use this file except in compliance
+#with the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+#Unless required by applicable law or agreed to in writing, software
+#distributed under the License is distributed on an "AS IS" BASIS,
+#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#See the License for the specific language governing permissions and
+#limitations under the License.
+
+""" Uses reflectio to write and read data objects. Provides support for remote 
+method invocation via protocol's proxy instance."""
+
+import avro.schema as schema
+import avro.ipc as ipc
+import avro.genericipc as genericipc
+import avro.reflectio as reflectio
+
+#TODO pkgname should not be passed, instead classes should be constructed 
+#based on schema namespace
+
+class _Proxy(object):
+
+  class _MethodInvoker(object):
+
+    def __init__(self, requestor, methodname):
+      self.requestor = requestor
+      self.methodname = methodname
+
+    def __call__(self, *args):
+      return self.requestor.request(self.methodname, args)
+
+  def __init__(self, requestor):
+    self.requestor = requestor
+    self.invokers = dict()
+    msgs = self.requestor.getlocal().getmessages()
+    for methodname, method in msgs.items():
+      self.invokers[methodname] = self._MethodInvoker(
+                                                self.requestor, methodname)
+
+  def __getattribute__(self, attr):
+    attrhandle = object.__getattribute__(self, "invokers").get(attr)
+    if attrhandle is None:
+      attrhandle = object.__getattribute__(self, attr)
+    return attrhandle
+
+def getclient(protocol, transceiver):
+  """Create a proxy instance whose methods invoke RPCs."""
+  requestor = ReflectRequestor(protocol, transceiver)
+  return _Proxy(requestor)
+
+class ReflectRequestor(genericipc.Requestor):
+
+  def __init__(self, localproto, transceiver):
+    ipc.RequestorBase.__init__(self, localproto, transceiver)
+    self.__pkgname = localproto.getnamespace() + "."
+
+  def getdatumwriter(self, schm):
+    return reflectio.ReflectDatumWriter(self.__pkgname, schm)
+
+  def getdatumreader(self, schm):
+    return reflectio.ReflectDatumReader(self.__pkgname, schm)
+
+  def writerequest(self, schm, req, encoder):
+    index = 0
+    for arg in req:
+      argschm = schm.getfields()[index][1]
+      genericipc.Requestor.writerequest(self, argschm, arg, encoder)
+
+  def readerror(self, schm, decoder):
+    return self.getdatumreader(schm).read(decoder)
+
+class ReflectResponder(genericipc.Responder):
+
+  def __init__(self, localproto, impl):
+    genericipc.Responder.__init__(self, localproto)
+    self.__pkgname = localproto.getnamespace() + "."
+    self.__impl = impl
+
+  def getdatumwriter(self, schm):
+    return reflectio.ReflectDatumWriter(self.__pkgname, schm)
+
+  def getdatumreader(self, schm):
+    return reflectio.ReflectDatumReader(self.__pkgname, schm)
+
+  def readrequest(self, schm, decoder):
+    req = list()
+    for field, fieldschm in schm.getfields():
+      req.append(genericipc.Responder.readrequest(self, fieldschm, decoder))
+    return req
+
+  def writeerror(self, schm, error, encoder):
+    self.getdatumwriter(schm).write(error, encoder)
+
+  def invoke(self, msg, req):
+    method = self.__impl.__getattribute__(msg.getname())
+    if method is None:
+      raise AttributeError("No method with name "+ method)
+    resp = method(*req)
+    return resp
+

Modified: hadoop/avro/trunk/src/test/py/interoptests.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/interoptests.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/interoptests.py (original)
+++ hadoop/avro/trunk/src/test/py/interoptests.py Thu Jun 25 18:12:00 2009
@@ -18,8 +18,9 @@
 import avro.schema as schema
 import avro.io as io
 import avro.ipc as ipc
-import avro.generic as generic
-import avro.reflect as reflect
+import avro.genericio as genericio
+import avro.reflectio as reflectio
+import avro.reflectipc as reflectipc
 import testio, testipc, testioreflect, testipcreflect
 
 _DATAFILE_DIR = "build/test/data-files/"
@@ -27,8 +28,8 @@
 
 class TestGeneratedFiles(unittest.TestCase):
 
-  def __init__(self, methodName, validator=generic.validate, 
-               datumreader=generic.DatumReader):
+  def __init__(self, methodName, validator=genericio.validate, 
+               datumreader=genericio.DatumReader):
     unittest.TestCase.__init__(self, methodName)
     self.__validator = validator
     self.__datumreader = datumreader
@@ -61,7 +62,7 @@
     sock.connect(("localhost", int(port)))
     client = ipc.SocketTransceiver(sock)
     testproto = testipcreflect.TestProtocol("testipc")
-    testproto.proxy = reflect.getclient(testipc.PROTOCOL, client)
+    testproto.proxy = reflectipc.getclient(testipc.PROTOCOL, client)
     testproto.checkhello()
     testproto.checkecho()
     testproto.checkechobytes()
@@ -70,7 +71,7 @@
 
 def _interopserver():
   addr = ('localhost', 0)
-  responder = reflect.ReflectResponder(testipc.PROTOCOL, 
+  responder = reflectipc.ReflectResponder(testipc.PROTOCOL, 
                                        testipcreflect.TestImpl())
   server = ipc.SocketServer(responder, addr)
   file = open(_SERVER_PORTS_DIR+"py-port", "w")

Modified: hadoop/avro/trunk/src/test/py/testio.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testio.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testio.py (original)
+++ hadoop/avro/trunk/src/test/py/testio.py Thu Jun 25 18:12:00 2009
@@ -17,7 +17,7 @@
 import unittest, random, cStringIO, time, sys, os, struct
 import avro.schema as schema
 import avro.io as io
-import avro.generic as generic
+import avro.genericio as genericio
 
 _DIR = "build/test/"
 _FILE = _DIR +"test.py.avro"
@@ -95,9 +95,9 @@
 
 class TestSchema(unittest.TestCase):
 
-  def __init__(self, methodName, validator=generic.validate,
-                               dwriter=generic.DatumWriter, 
-                               dreader=generic.DatumReader, random=RandomData,
+  def __init__(self, methodName, validator=genericio.validate,
+                               dwriter=genericio.DatumWriter, 
+                               dreader=genericio.DatumReader, 
random=RandomData,
                                assertdata=True):
     unittest.TestCase.__init__(self, methodName)
     self.__validator = validator
@@ -185,10 +185,10 @@
     self.assertTrue(self.__validator(schm, datum))
     w = self.__datumwriter(schm)
     writer = cStringIO.StringIO()
-    w.write(datum, io.ValueWriter(writer))
+    w.write(datum, io.Encoder(writer))
     r = self.__datumreader(schm)
     reader = cStringIO.StringIO(writer.getvalue())
-    ob = r.read(io.ValueReader(reader))
+    ob = r.read(io.Decoder(reader))
     if self.__assertdata:
       self.assertEquals(datum, ob)
 
@@ -215,7 +215,7 @@
   file = sys.argv[2]
   count = int(sys.argv[3])
   randomData = RandomData(schm)
-  dw = io.DataFileWriter(schm, open(file, 'wb'), generic.DatumWriter())
+  dw = io.DataFileWriter(schm, open(file, 'wb'), genericio.DatumWriter())
   for i in range(0,count):
     dw.append(randomData.next())
   dw.close()

Modified: hadoop/avro/trunk/src/test/py/testioreflect.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testioreflect.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testioreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testioreflect.py Thu Jun 25 18:12:00 2009
@@ -15,21 +15,19 @@
 #limitations under the License.
 
 import avro.schema as schema
-import avro.reflect as reflect
-import avro.generic as generic
+import avro.reflectio as reflectio
 import testio
 
 _PKGNAME = "org.apache.avro.test."
 
 def dyvalidator(schm, object):
-  return reflect.validate(schm, _PKGNAME, object)
+  return reflectio.validate(schm, _PKGNAME, object)
 
 class DyRandomData(testio.RandomData):
 
   def nextdata(self, schm, d=0):
     if schm.gettype() == schema.RECORD:
-      name = schm.getname()
-      clazz = reflect.gettype(name, _PKGNAME)
+      clazz = reflectio.gettype(schm, _PKGNAME)
       result = clazz()
       for field,fieldschema in schm.getfields():
         result.__setattr__(field, self.nextdata(fieldschema,d))
@@ -37,15 +35,15 @@
     else:
       return testio.RandomData.nextdata(self, schm, d)
 
-class ReflectDReader(reflect.ReflectDatumReader):
+class ReflectDReader(reflectio.ReflectDatumReader):
   
   def __init__(self, schm=None):
-    reflect.ReflectDatumReader.__init__(self, _PKGNAME, schm)
+    reflectio.ReflectDatumReader.__init__(self, _PKGNAME, schm)
 
-class ReflectDWriter(reflect.ReflectDatumWriter):
+class ReflectDWriter(reflectio.ReflectDatumWriter):
   
   def __init__(self, schm=None):
-    reflect.ReflectDatumWriter.__init__(self, _PKGNAME, schm)
+    reflectio.ReflectDatumWriter.__init__(self, _PKGNAME, schm)
 
 class TestSchema(testio.TestSchema):
 

Modified: hadoop/avro/trunk/src/test/py/testipc.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipc.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipc.py (original)
+++ hadoop/avro/trunk/src/test/py/testipc.py Thu Jun 25 18:12:00 2009
@@ -17,7 +17,7 @@
 import unittest, socket, struct
 import testio
 import avro.ipc as ipc
-import avro.generic as generic
+import avro.genericipc as genericipc
 import avro.protocol as protocol
 import avro.schema as schema
 
@@ -25,7 +25,7 @@
 
 class TestProtocol(unittest.TestCase):
 
-  class TestResponder(generic.Responder):
+  class TestResponder(genericipc.Responder):
 
     def __init__(self):
       ipc.ResponderBase.__init__(self, PROTOCOL)
@@ -50,6 +50,8 @@
         raise schema.AvroException("unexpected message:",msg.getname());
 
   def testipc(self):
+    self.server = None
+    self.requestor = None
     try:
       self.checkstartserver()
       self.checkhello()
@@ -65,12 +67,12 @@
     sock = socket.socket()
     sock.connect(self.server.getaddress())
     client = ipc.SocketTransceiver(sock)
-    self.requestor = generic.Requestor(PROTOCOL, client)
+    self.requestor = genericipc.Requestor(PROTOCOL, client)
 
   def checkhello(self):
     params = dict()
     params['greeting'] = unicode('bob')
-    resp = self.requestor.call('hello', params)
+    resp = self.requestor.request('hello', params)
     self.assertEquals('goodbye',resp)
 
   def checkecho(self):
@@ -80,7 +82,7 @@
     record['hash'] = struct.pack('16s','0123456789012345')
     params = dict()
     params['record'] = record
-    echoed = self.requestor.call('echo', params)
+    echoed = self.requestor.request('echo', params)
     self.assertEquals(record,echoed)
 
   def checkechobytes(self):
@@ -88,17 +90,18 @@
     rand = testio.RandomData(schema._BytesSchema())
     data = rand.next()
     params['data'] = data
-    echoed = self.requestor.call('echoBytes', params)
+    echoed = self.requestor.request('echoBytes', params)
     self.assertEquals(data,echoed)
 
   def checkerror(self):
     error = None
     try:
-      self.requestor.call("error", dict())
+      self.requestor.request("error", dict())
     except ipc.AvroRemoteException, e:
       error = e
     self.assertNotEquals(error, None)
     self.assertEquals("an error", error.getvalue().get("message"))
 
   def checkshutdown(self):
-    self.server.close()
+    if self.server is not None:
+      self.server.close()

Modified: hadoop/avro/trunk/src/test/py/testipcreflect.py
URL: 
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/py/testipcreflect.py?rev=788453&r1=788452&r2=788453&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/py/testipcreflect.py (original)
+++ hadoop/avro/trunk/src/test/py/testipcreflect.py Thu Jun 25 18:12:00 2009
@@ -16,13 +16,15 @@
 
 import socket, struct
 import avro.schema as schema
-import avro.reflect as reflect
+import avro.reflectio as reflectio
+import avro.reflectipc as reflectipc
 import avro.ipc as ipc
 import testipc, testio, testioreflect
 
-TestRecord = reflect.gettype("TestRecord", testioreflect._PKGNAME)
-TestError = reflect.gettype("TestError", testioreflect._PKGNAME, 
-                            ipc.AvroRemoteException)
+TestRecord = reflectio.gettype(testipc.PROTOCOL.gettypes().get("TestRecord"), 
+                               testioreflect._PKGNAME)
+TestError = reflectio.gettype(testipc.PROTOCOL.gettypes().get("TestError"), 
+                              testioreflect._PKGNAME, ipc.AvroRemoteException)
 
 class TestImpl(object):
 
@@ -44,12 +46,12 @@
 
   def checkstartserver(self):
     addr = ('localhost', 0)
-    responder = reflect.ReflectResponder(testipc.PROTOCOL, TestImpl())
+    responder = reflectipc.ReflectResponder(testipc.PROTOCOL, TestImpl())
     self.server = ipc.SocketServer(responder, addr)
     sock = socket.socket()
     sock.connect(self.server.getaddress())
     client = ipc.SocketTransceiver(sock)
-    self.proxy = reflect.getclient(testipc.PROTOCOL, client)
+    self.proxy = reflectipc.getclient(testipc.PROTOCOL, client)
 
   def checkhello(self):
     resp = self.proxy.hello(unicode("bob"))
@@ -77,6 +79,3 @@
       error = e
     self.assertNotEquals(error, None)
     self.assertEquals("an error", error.message)
-
-  def checkshutdown(self):
-    self.server.close()
\ No newline at end of file


Reply via email to