Author: todd Date: Wed Jun 15 22:21:31 2011 New Revision: 1136222 URL: http://svn.apache.org/viewvc?rev=1136222&view=rev Log: HADOOP-7379. Add the ability to serialize and deserialize protocol buffers in ObjectWritable. Contributed by Todd Lipcon.
Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java Modified: hadoop/common/trunk/common/CHANGES.txt hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java Modified: hadoop/common/trunk/common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/CHANGES.txt?rev=1136222&r1=1136221&r2=1136222&view=diff ============================================================================== --- hadoop/common/trunk/common/CHANGES.txt (original) +++ hadoop/common/trunk/common/CHANGES.txt Wed Jun 15 22:21:31 2011 @@ -44,6 +44,9 @@ Trunk (unreleased changes) HADOOP-7144. Expose JMX metrics via JSON servlet. (Robert Joseph Evans via cdouglas) + HADOOP-7379. Add the ability to serialize and deserialize protocol buffers + in ObjectWritable. (todd) + IMPROVEMENTS HADOOP-7042. Updates to test-patch.sh to include failed test names and Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java?rev=1136222&view=auto ============================================================================== --- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java (added) +++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/DataOutputOutputStream.java Wed Jun 15 22:21:31 2011 @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * OutputStream implementation that wraps a DataOutput. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class DataOutputOutputStream extends OutputStream { + + private final DataOutput out; + + /** + * Construct an OutputStream from the given DataOutput. If 'out' + * is already an OutputStream, simply returns it. Otherwise, wraps + * it in an OutputStream. + * @param out the DataOutput to wrap + * @return an OutputStream instance that outputs to 'out' + */ + public static OutputStream constructOutputStream(DataOutput out) { + if (out instanceof OutputStream) { + return (OutputStream)out; + } else { + return new DataOutputOutputStream(out); + } + } + + private DataOutputOutputStream(DataOutput out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + out.writeByte(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + +} Modified: hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java?rev=1136222&r1=1136221&r2=1136222&view=diff ============================================================================== --- hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java (original) +++ hadoop/common/trunk/common/src/java/org/apache/hadoop/io/ObjectWritable.java Wed Jun 15 22:21:31 2011 @@ -19,6 +19,8 @@ package org.apache.hadoop.io; import java.lang.reflect.Array; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.io.*; import java.util.*; @@ -26,6 +28,9 @@ import java.util.*; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.*; +import org.apache.hadoop.util.ProtoUtil; + +import com.google.protobuf.Message; /** A polymorphic Writable that writes an instance with it's class name. * Handles arrays, strings and primitive types without a Writable wrapper. @@ -191,6 +196,9 @@ public class ObjectWritable implements W UTF8.writeString(out, instance.getClass().getName()); ((Writable)instance).write(out); + } else if (Message.class.isAssignableFrom(declaredClass)) { + ((Message)instance).writeDelimitedTo( + DataOutputOutputStream.constructOutputStream(out)); } else { throw new IOException("Can't write: "+instance+" as "+declaredClass); } @@ -261,6 +269,8 @@ public class ObjectWritable implements W instance = UTF8.readString(in); } else if (declaredClass.isEnum()) { // enum instance = Enum.valueOf((Class<? extends Enum>) declaredClass, UTF8.readString(in)); + } else if (Message.class.isAssignableFrom(declaredClass)) { + instance = tryInstantiateProtobuf(declaredClass, in); } else { // Writable Class instanceClass = null; String str = UTF8.readString(in); @@ -286,6 +296,67 @@ public class ObjectWritable implements W } /** + * Try to instantiate a protocol buffer of the given message class + * from the given input stream. + * + * @param protoClass the class of the generated protocol buffer + * @param dataIn the input stream to read from + * @return the instantiated Message instance + * @throws IOException if an IO problem occurs + */ + private static Message tryInstantiateProtobuf( + Class<?> protoClass, + DataInput dataIn) throws IOException { + + try { + if (dataIn instanceof InputStream) { + // We can use the built-in parseDelimitedFrom and not have to re-copy + // the data + Method parseMethod = getStaticProtobufMethod(protoClass, + "parseDelimitedFrom", InputStream.class); + return (Message)parseMethod.invoke(null, (InputStream)dataIn); + } else { + // Have to read it into a buffer first, since protobuf doesn't deal + // with the DataInput interface directly. + + // Read the size delimiter that writeDelimitedTo writes + int size = ProtoUtil.readRawVarint32(dataIn); + if (size < 0) { + throw new IOException("Invalid size: " + size); + } + + byte[] data = new byte[size]; + dataIn.readFully(data); + Method parseMethod = getStaticProtobufMethod(protoClass, + "parseFrom", byte[].class); + return (Message)parseMethod.invoke(null, data); + } + } catch (InvocationTargetException e) { + + if (e.getCause() instanceof IOException) { + throw (IOException)e.getCause(); + } else { + throw new IOException(e.getCause()); + } + } catch (IllegalAccessException iae) { + throw new AssertionError("Could not access parse method in " + + protoClass); + } + } + + static Method getStaticProtobufMethod(Class<?> declaredClass, String method, + Class<?> ... args) { + + try { + return declaredClass.getMethod(method, args); + } catch (Exception e) { + // This is a bug in Hadoop - protobufs should all have this static method + throw new AssertionError("Protocol buffer class " + declaredClass + + " does not have an accessible parseFrom(InputStream) method!"); + } + } + + /** * Find and load the class with given name <tt>className</tt> by first finding * it in the specified <tt>conf</tt>. If the specified <tt>conf</tt> is null, * try load it directly. Added: hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java?rev=1136222&view=auto ============================================================================== --- hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java (added) +++ hadoop/common/trunk/common/src/java/org/apache/hadoop/util/ProtoUtil.java Wed Jun 15 22:21:31 2011 @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.io.DataInput; +import java.io.IOException; + +public abstract class ProtoUtil { + + /** + * Read a variable length integer in the same format that ProtoBufs encodes. + * @param in the input stream to read from + * @return the integer + * @throws IOException if it is malformed or EOF. + */ + public static int readRawVarint32(DataInput in) throws IOException { + byte tmp = in.readByte(); + if (tmp >= 0) { + return tmp; + } + int result = tmp & 0x7f; + if ((tmp = in.readByte()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = in.readByte()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = in.readByte()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + result |= (tmp = in.readByte()) << 28; + if (tmp < 0) { + // Discard upper 32 bits. + for (int i = 0; i < 5; i++) { + if (in.readByte() >= 0) { + return result; + } + } + throw new IOException("Malformed varint"); + } + } + } + } + return result; + } + +} Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java?rev=1136222&view=auto ============================================================================== --- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java (added) +++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/io/TestObjectWritableProtos.java Wed Jun 15 22:21:31 2011 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Message; + +/** + * Test case for the use of Protocol Buffers within ObjectWritable. + */ +public class TestObjectWritableProtos { + + @Test + public void testProtoBufs() throws IOException { + doTest(1); + } + + @Test + public void testProtoBufs2() throws IOException { + doTest(2); + } + + @Test + public void testProtoBufs3() throws IOException { + doTest(3); + } + + /** + * Write a protobuf to a buffer 'numProtos' times, and then + * read them back, making sure all data comes through correctly. + */ + private void doTest(int numProtos) throws IOException { + Configuration conf = new Configuration(); + DataOutputBuffer out = new DataOutputBuffer(); + + // Write numProtos protobufs to the buffer + Message[] sent = new Message[numProtos]; + for (int i = 0; i < numProtos; i++) { + // Construct a test protocol buffer using one of the + // protos that ships with the protobuf library + Message testProto = DescriptorProtos.EnumValueDescriptorProto.newBuilder() + .setName("test" + i).setNumber(i).build(); + ObjectWritable.writeObject(out, testProto, + DescriptorProtos.EnumValueDescriptorProto.class, conf); + sent[i] = testProto; + } + + // Read back the data + DataInputBuffer in = new DataInputBuffer(); + in.reset(out.getData(), out.getLength()); + + for (int i = 0; i < numProtos; i++) { + Message received = (Message)ObjectWritable.readObject(in, conf); + + assertEquals(sent[i], received); + } + } + +} Modified: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java?rev=1136222&r1=1136221&r2=1136222&view=diff ============================================================================== --- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java (original) +++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/ipc/TestRPC.java Wed Jun 15 22:21:31 2011 @@ -40,6 +40,10 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.AccessControlException; + +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; + import static org.apache.hadoop.test.MetricsAsserts.*; import static org.mockito.Mockito.*; @@ -71,6 +75,9 @@ public class TestRPC extends TestCase { int error() throws IOException; void testServerGet() throws IOException; int[] exchange(int[] values) throws IOException; + + DescriptorProtos.EnumDescriptorProto exchangeProto( + DescriptorProtos.EnumDescriptorProto arg); } public static class TestImpl implements TestProtocol { @@ -136,6 +143,11 @@ public class TestRPC extends TestCase { } return values; } + + @Override + public EnumDescriptorProto exchangeProto(EnumDescriptorProto arg) { + return arg; + } } // @@ -314,6 +326,13 @@ public class TestRPC extends TestCase { intResult = proxy.add(new int[] {1, 2}); assertEquals(intResult, 3); + + // Test protobufs + EnumDescriptorProto sendProto = + EnumDescriptorProto.newBuilder().setName("test").build(); + EnumDescriptorProto retProto = proxy.exchangeProto(sendProto); + assertEquals(sendProto, retProto); + assertNotSame(sendProto, retProto); boolean caught = false; try { Added: hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java?rev=1136222&view=auto ============================================================================== --- hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java (added) +++ hadoop/common/trunk/common/src/test/core/org/apache/hadoop/util/TestProtoUtil.java Wed Jun 15 22:21:31 2011 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; + +import org.junit.Test; + +import com.google.protobuf.CodedOutputStream; + +public class TestProtoUtil { + + /** + * Values to test encoding as variable length integers + */ + private static final int[] TEST_VINT_VALUES = new int[] { + 0, 1, -1, 127, 128, 129, 255, 256, 257, + 0x1234, -0x1234, + 0x123456, -0x123456, + 0x12345678, -0x12345678 + }; + + /** + * Test that readRawVarint32 is compatible with the varints encoded + * by ProtoBuf's CodedOutputStream. + */ + @Test + public void testVarInt() throws IOException { + // Test a few manufactured values + for (int value : TEST_VINT_VALUES) { + doVarIntTest(value); + } + // Check 1-bits at every bit position + for (int i = 1; i != 0; i <<= 1) { + doVarIntTest(i); + doVarIntTest(-i); + doVarIntTest(i - 1); + doVarIntTest(~i); + } + } + + private void doVarIntTest(int value) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + CodedOutputStream cout = CodedOutputStream.newInstance(baos); + cout.writeRawVarint32(value); + cout.flush(); + + DataInputStream dis = new DataInputStream( + new ByteArrayInputStream(baos.toByteArray())); + assertEquals(value, ProtoUtil.readRawVarint32(dis)); + } +}