Author: hammer
Date: Thu Feb 4 08:15:02 2010
New Revision: 906394
URL: http://svn.apache.org/viewvc?rev=906394&view=rev
Log:
AVRO-322: Add a working client and server to Python implementation
using HTTP as a transport (hammer)
Added:
hadoop/avro/trunk/lang/py/test/sample_http_client.py
hadoop/avro/trunk/lang/py/test/sample_http_server.py
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/lang/py/src/avro/ipc.py
hadoop/avro/trunk/lang/py/test/sample_ipc_client.py
hadoop/avro/trunk/lang/py/test/sample_ipc_server.py
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Feb 4 08:15:02 2010
@@ -87,6 +87,9 @@
AVRO-380. Avro Container File format change: add block size to block
descriptor. (Scott Carey via philz)
+ AVRO-322. Add a working client and server to Python implementation
+ using HTTP as a transport (hammer)
+
IMPROVEMENTS
AVRO-157. Changes from code review comments for C++. (sbanacho)
Modified: hadoop/avro/trunk/lang/py/src/avro/ipc.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/src/avro/ipc.py?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/src/avro/ipc.py (original)
+++ hadoop/avro/trunk/lang/py/src/avro/ipc.py Thu Feb 4 08:15:02 2010
@@ -18,7 +18,6 @@
"""
import cStringIO
import struct
-import socket
from avro import io
from avro import protocol
from avro import schema
@@ -96,26 +95,26 @@
class Requestor(object):
"""Base class for the client side of a protocol interaction."""
- def __init__(self, local_protocol, transport):
+ def __init__(self, local_protocol, transceiver):
self._local_protocol = local_protocol
- self._transport = transport
+ self._transceiver = transceiver
self._remote_protocol = None
self._remote_hash = None
self._send_protocol = None
# read-only properties
local_protocol = property(lambda self: self._local_protocol)
- transport = property(lambda self: self._transport)
+ transceiver = property(lambda self: self._transceiver)
# read/write properties
def set_remote_protocol(self, new_remote_protocol):
self._remote_protocol = new_remote_protocol
- REMOTE_PROTOCOLS[self.transport.remote_name] = self.remote_protocol
+ REMOTE_PROTOCOLS[self.transceiver.remote_name] = self.remote_protocol
remote_protocol = property(lambda self: self._remote_protocol,
set_remote_protocol)
def set_remote_hash(self, new_remote_hash):
self._remote_hash = new_remote_hash
- REMOTE_HASHES[self.transport.remote_name] = self.remote_hash
+ REMOTE_HASHES[self.transceiver.remote_name] = self.remote_hash
remote_hash = property(lambda self: self._remote_hash, set_remote_hash)
def set_send_protocol(self, new_send_protocol):
self._send_protocol = new_send_protocol
@@ -131,9 +130,9 @@
self.write_handshake_request(buffer_encoder)
self.write_call_request(message_name, request_datum, buffer_encoder)
- # send the handshake and call request; block until call response
+ # send the handshake and call request; block until call response
call_request = buffer_writer.getvalue()
- call_response = self.transport.transceive(call_request)
+ call_response = self.transceiver.transceive(call_request)
# process the handshake and call response
buffer_decoder = io.BinaryDecoder(cStringIO.StringIO(call_response))
@@ -145,7 +144,7 @@
def write_handshake_request(self, encoder):
local_hash = self.local_protocol.md5
- remote_name = self.transport.remote_name
+ remote_name = self.transceiver.remote_name
remote_hash = REMOTE_HASHES.get(remote_name)
if remote_hash is None:
remote_hash = local_hash
@@ -265,12 +264,11 @@
def set_protocol_cache(self, hash, protocol):
self.protocol_cache[hash] = protocol
- def respond(self, transport):
+ def respond(self, call_request):
"""
Called by a server to deserialize a request, compute and serialize
a response or error. Compare to 'handle()' in Thrift.
"""
- call_request = transport.read_framed_message()
buffer_reader = cStringIO.StringIO(call_request)
buffer_decoder = io.BinaryDecoder(buffer_reader)
buffer_writer = cStringIO.StringIO()
@@ -279,8 +277,7 @@
response_metadata = {}
try:
- remote_protocol = self.process_handshake(transport, buffer_decoder,
- buffer_encoder)
+ remote_protocol = self.process_handshake(buffer_decoder, buffer_encoder)
# handshake failure
if remote_protocol is None:
return buffer_writer.getvalue()
@@ -329,7 +326,7 @@
self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
return buffer_writer.getvalue()
- def process_handshake(self, transport, decoder, encoder):
+ def process_handshake(self, decoder, encoder):
handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
handshake_response = {}
@@ -380,36 +377,45 @@
datum_writer.write(str(error_exception), encoder)
#
-# Transport Implementations
+# Utility classes
#
-class SocketTransport(object):
- """A simple socket-based Transport implementation."""
- def __init__(self, sock):
- self._sock = sock
+class FramedReader(object):
+ """Wrapper around a file-like object to read framed data."""
+ def __init__(self, reader):
+ self._reader = reader
# read-only properties
- sock = property(lambda self: self._sock)
- remote_name = property(lambda self: self.sock.getsockname())
-
- def transceive(self, request):
- self.write_framed_message(request)
- return self.read_framed_message()
+ reader = property(lambda self: self._reader)
def read_framed_message(self):
message = []
while True:
buffer = cStringIO.StringIO()
- buffer_length = self.read_buffer_length()
+ buffer_length = self._read_buffer_length()
if buffer_length == 0:
return ''.join(message)
while buffer.tell() < buffer_length:
- chunk = self.sock.recv(buffer_length - buffer.tell())
+ chunk = self.reader.read(buffer_length - buffer.tell())
if chunk == '':
- raise ConnectionClosedException("Socket read 0 bytes.")
+ raise ConnectionClosedException("Reader read 0 bytes.")
buffer.write(chunk)
message.append(buffer.getvalue())
+ def _read_buffer_length(self):
+ read = self.reader.read(BUFFER_HEADER_LENGTH)
+ if read == '':
+ raise ConnectionClosedException("Reader read 0 bytes.")
+ return BIG_ENDIAN_INT_STRUCT.unpack(read)[0]
+
+class FramedWriter(object):
+ """Wrapper around a file-like object to write framed data."""
+ def __init__(self, writer):
+ self._writer = writer
+
+ # read-only properties
+ writer = property(lambda self: self._writer)
+
def write_framed_message(self, message):
message_length = len(message)
total_bytes_sent = 0
@@ -427,26 +433,49 @@
def write_buffer(self, chunk):
buffer_length = len(chunk)
self.write_buffer_length(buffer_length)
- total_bytes_sent = 0
- while total_bytes_sent < buffer_length:
- bytes_sent = self.sock.send(chunk[total_bytes_sent:])
- if bytes_sent == 0:
- raise ConnectionClosedException("Socket sent 0 bytes.")
- total_bytes_sent += bytes_sent
+ self.writer.write(chunk)
def write_buffer_length(self, n):
- bytes_sent = self.sock.sendall(BIG_ENDIAN_INT_STRUCT.pack(n))
- if bytes_sent == 0:
- raise ConnectionClosedException("socket sent 0 bytes")
+ self.writer.write(BIG_ENDIAN_INT_STRUCT.pack(n))
- def read_buffer_length(self):
- read = self.sock.recv(BUFFER_HEADER_LENGTH)
- if read == '':
- raise ConnectionClosedException("Socket read 0 bytes.")
- return BIG_ENDIAN_INT_STRUCT.unpack(read)[0]
+#
+# Transceiver Implementations
+#
+
+class HTTPTransceiver(object):
+ """
+ A simple HTTP-based transceiver implementation.
+ Useful for clients but not for servers
+ """
+ def __init__(self, conn):
+ self._conn = conn
+
+ # read-only properties
+ conn = property(lambda self: self._conn)
+ sock = property(lambda self: self.conn.sock)
+ remote_name = property(lambda self: self.sock.getsockname())
+
+ def transceive(self, request):
+ self.write_framed_message(request)
+ return self.read_framed_message()
+
+ def read_framed_message(self):
+ response_reader = FramedReader(self.conn.getresponse())
+ return response_reader.read_framed_message()
+
+ def write_framed_message(self, message):
+ req_method = 'POST'
+ req_resource = '/'
+ req_headers = {'Content-Type': 'avro/binary'}
+
+ req_body_buffer = FramedWriter(cStringIO.StringIO())
+ req_body_buffer.write_framed_message(message)
+ req_body = req_body_buffer.writer.getvalue()
+
+ self.conn.request(req_method, req_resource, req_body, req_headers)
def close(self):
- self.sock.close()
+ self.conn.close()
#
# Server Implementations (none yet)
Added: hadoop/avro/trunk/lang/py/test/sample_http_client.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_http_client.py?rev=906394&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_http_client.py (added)
+++ hadoop/avro/trunk/lang/py/test/sample_http_client.py Thu Feb 4 08:15:02
2010
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import httplib
+import sys
+
+from avro import ipc
+from avro import protocol
+from avro import schema
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_HOST = 'localhost'
+SERVER_PORT = 9090
+
+class UsageError(Exception):
+ def __init__(self, value):
+ self.value = value
+ def __str__(self):
+ return repr(self.value)
+
+def make_requestor(server_host, server_port, protocol):
+ conn = httplib.HTTPConnection(SERVER_HOST, SERVER_PORT)
+ conn.connect()
+ client = ipc.HTTPTransceiver(conn)
+ return ipc.Requestor(protocol, client)
+
+if __name__ == '__main__':
+ if len(sys.argv) not in [4, 5]:
+ raise UsageError("Usage: <to> <from> <body> [<count>]")
+
+ # client code - attach to the server and send a message
+ # fill in the Message record
+ message = dict()
+ message['to'] = sys.argv[1]
+ message['from'] = sys.argv[2]
+ message['body'] = sys.argv[3]
+
+ try:
+ num_messages = int(sys.argv[4])
+ except:
+ num_messages = 1
+
+ # build the parameters for the request
+ params = {}
+ params['message'] = message
+
+ # send the requests and print the result
+ for msg_count in range(num_messages):
+ requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+ result = requestor.request('send', params)
+ print("Result: " + result)
+
+ # try out a replay message
+ requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+ result = requestor.request('replay', dict())
+ print("Replay Result: " + result)
Added: hadoop/avro/trunk/lang/py/test/sample_http_server.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_http_server.py?rev=906394&view=auto
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_http_server.py (added)
+++ hadoop/avro/trunk/lang/py/test/sample_http_server.py Thu Feb 4 08:15:02
2010
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
+from avro import ipc
+from avro import protocol
+from avro import schema
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+ {"name": "Message", "type": "record",
+ "fields": [
+ {"name": "to", "type": "string"},
+ {"name": "from", "type": "string"},
+ {"name": "body", "type": "string"}
+ ]
+ }
+ ],
+
+ "messages": {
+ "send": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "string"
+ },
+ "replay": {
+ "request": [],
+ "response": "string"
+ }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
+SERVER_ADDRESS = ('localhost', 9090)
+
+class MailResponder(ipc.Responder):
+ def __init__(self):
+ ipc.Responder.__init__(self, MAIL_PROTOCOL)
+
+ def invoke(self, message, request):
+ if message.name == 'send':
+ request_content = request['message']
+ response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
+ request_content
+ return response
+ elif message.name == 'replay':
+ return 'replay'
+
+class MailHandler(BaseHTTPRequestHandler):
+ def do_POST(self):
+ self.responder = MailResponder()
+ call_request_reader = ipc.FramedReader(self.rfile)
+ call_request = call_request_reader.read_framed_message()
+ resp_body = self.responder.respond(call_request)
+ self.send_response(200)
+ self.send_header('Content-Type', 'avro/binary')
+ self.end_headers()
+ resp_writer = ipc.FramedWriter(self.wfile)
+ resp_writer.write_framed_message(resp_body)
+
+if __name__ == '__main__':
+ mail_server = HTTPServer(SERVER_ADDRESS, MailHandler)
+ mail_server.allow_reuse_address = True
+ mail_server.serve_forever()
Modified: hadoop/avro/trunk/lang/py/test/sample_ipc_client.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_ipc_client.py?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_ipc_client.py (original)
+++ hadoop/avro/trunk/lang/py/test/sample_ipc_client.py Thu Feb 4 08:15:02 2010
@@ -1,95 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import socket
-import sys
-
-from avro import ipc
-from avro import protocol
-from avro import schema
-
-MAIL_PROTOCOL_JSON = """\
-{"namespace": "example.proto",
- "protocol": "Mail",
-
- "types": [
- {"name": "Message", "type": "record",
- "fields": [
- {"name": "to", "type": "string"},
- {"name": "from", "type": "string"},
- {"name": "body", "type": "string"}
- ]
- }
- ],
-
- "messages": {
- "send": {
- "request": [{"name": "message", "type": "Message"}],
- "response": "string"
- },
- "replay": {
- "request": [],
- "response": "string"
- }
- }
-}
-"""
-MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
-SERVER_ADDRESS = ('localhost', 9090)
-
-class UsageError(Exception):
- def __init__(self, value):
- self.value = value
- def __str__(self):
- return repr(self.value)
-
-def make_requestor(server_address, protocol):
- sock = socket.socket()
- sock.connect(server_address)
- client = ipc.SocketTransport(sock)
- return ipc.Requestor(protocol, client)
-
-if __name__ == '__main__':
- if len(sys.argv) not in [4, 5]:
- raise UsageError("Usage: <to> <from> <body> [<count>]")
-
- # client code - attach to the server and send a message
- # fill in the Message record
- message = dict()
- message['to'] = sys.argv[1]
- message['from'] = sys.argv[2]
- message['body'] = sys.argv[3]
-
- try:
- num_messages = int(sys.argv[4])
- except:
- num_messages = 1
-
- # build the parameters for the request
- params = {}
- params['message'] = message
-
- # send the requests and print the result
- for msg_count in range(num_messages):
- requestor = make_requestor(SERVER_ADDRESS, MAIL_PROTOCOL)
- result = requestor.request('send', params)
- print("Result: " + result)
-
- # try out a replay message
- requestor = make_requestor(SERVER_ADDRESS, MAIL_PROTOCOL)
- result = requestor.request('replay', dict())
- print("Replay Result: " + result)
Modified: hadoop/avro/trunk/lang/py/test/sample_ipc_server.py
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/lang/py/test/sample_ipc_server.py?rev=906394&r1=906393&r2=906394&view=diff
==============================================================================
--- hadoop/avro/trunk/lang/py/test/sample_ipc_server.py (original)
+++ hadoop/avro/trunk/lang/py/test/sample_ipc_server.py Thu Feb 4 08:15:02 2010
@@ -1,74 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from SocketServer import BaseRequestHandler, TCPServer
-from avro import ipc
-from avro import protocol
-from avro import schema
-
-MAIL_PROTOCOL_JSON = """\
-{"namespace": "example.proto",
- "protocol": "Mail",
-
- "types": [
- {"name": "Message", "type": "record",
- "fields": [
- {"name": "to", "type": "string"},
- {"name": "from", "type": "string"},
- {"name": "body", "type": "string"}
- ]
- }
- ],
-
- "messages": {
- "send": {
- "request": [{"name": "message", "type": "Message"}],
- "response": "string"
- },
- "replay": {
- "request": [],
- "response": "string"
- }
- }
-}
-"""
-MAIL_PROTOCOL = protocol.parse(MAIL_PROTOCOL_JSON)
-SERVER_ADDRESS = ('localhost', 9090)
-
-class MailResponder(ipc.Responder):
- def __init__(self):
- ipc.Responder.__init__(self, MAIL_PROTOCOL)
-
- def invoke(self, message, request):
- if message.name == 'send':
- request_content = request['message']
- response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
- request_content
- return response
- elif message.name == 'replay':
- return 'replay'
-
-class MailHandler(BaseRequestHandler):
- def handle(self):
- self.responder = MailResponder()
- self.transport = ipc.SocketTransport(self.request)
- self.transport.write_framed_message(self.responder.respond(self.transport))
-
-if __name__ == '__main__':
- mail_server = TCPServer(SERVER_ADDRESS, MailHandler)
- mail_server.allow_reuse_address = True
- mail_server.serve_forever()