HIVE-12175: Upgrade Kryo version to 3.0.x (Prasanth Jayachandran reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/adbc0ab6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/adbc0ab6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/adbc0ab6 Branch: refs/heads/master-fixed Commit: adbc0ab6aeff848dbcee83d565febd40797300c2 Parents: e34588e Author: Prasanth Jayachandran <j.prasant...@gmail.com> Authored: Tue Nov 24 12:43:46 2015 -0600 Committer: Owen O'Malley <omal...@apache.org> Committed: Tue Nov 24 12:10:10 2015 -0800 ---------------------------------------------------------------------- itests/qtest-accumulo/pom.xml | 2 +- pom.xml | 6 +- ql/pom.xml | 36 +++-- .../apache/hadoop/hive/ql/exec/Utilities.java | 145 +++++++++++++++++-- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 15 -- .../apache/hadoop/hive/ql/plan/ReduceWork.java | 5 - spark-client/pom.xml | 28 ++-- .../hive/spark/client/rpc/KryoMessageCodec.java | 11 +- 8 files changed, 185 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/itests/qtest-accumulo/pom.xml ---------------------------------------------------------------------- diff --git a/itests/qtest-accumulo/pom.xml b/itests/qtest-accumulo/pom.xml index 7403a15..f7325dc 100644 --- a/itests/qtest-accumulo/pom.xml +++ b/itests/qtest-accumulo/pom.xml @@ -123,7 +123,7 @@ <!-- Declare hive-exec dependencies that were shaded in instead of being listed as dependencies --> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> + <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>${kryo.version}</version> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c6df4a5..c38c10f 100644 --- a/pom.xml +++ b/pom.xml @@ -144,7 +144,7 @@ <jodd.version>3.5.2</jodd.version> <json.version>20090211</json.version> <junit.version>4.11</junit.version> - <kryo.version>2.22</kryo.version> + <kryo.version>3.0.3</kryo.version> <libfb303.version>0.9.3</libfb303.version> <libthrift.version>0.9.3</libthrift.version> <log4j2.version>2.4</log4j2.version> @@ -228,8 +228,8 @@ <dependencies> <!-- dependencies are always listed in sorted order by groupId, artifectId --> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> <version>${kryo.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/ql/pom.xml ---------------------------------------------------------------------- diff --git a/ql/pom.xml b/ql/pom.xml index 9420a62..d893099 100644 --- a/ql/pom.xml +++ b/ql/pom.xml @@ -72,8 +72,8 @@ </dependency> <!-- inter-project --> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> <version>${kryo.version}</version> </dependency> <dependency> @@ -594,16 +594,20 @@ <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <optional>true</optional> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>commmons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> + <exclusions> + <exclusion> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.sun.jersey</groupId> @@ -746,7 +750,9 @@ <include>org.apache.hive:hive-serde</include> <include>org.apache.hive:hive-llap-client</include> <include>org.apache.hive:hive-metastore</include> - <include>com.esotericsoftware.kryo:kryo</include> + <include>com.esotericsoftware:kryo-shaded</include> + <include>com.esotericsoftware:minlog</include> + <include>org.objenesis:objenesis</include> <include>org.apache.parquet:parquet-hadoop-bundle</include> <include>org.apache.thrift:libthrift</include> <include>org.apache.thrift:libfb303</include> @@ -779,6 +785,10 @@ <pattern>com.esotericsoftware</pattern> <shadedPattern>org.apache.hive.com.esotericsoftware</shadedPattern> </relocation> + <relocation> + <pattern>org.objenesis</pattern> + <shadedPattern>org.apache.hive.org.objenesis</shadedPattern> + </relocation> </relocations> </configuration> </execution> http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 9dbb45a..8b8cf6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -37,6 +37,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; +import java.lang.reflect.Array; +import java.lang.reflect.Field; import java.net.URI; import java.net.URL; import java.net.URLClassLoader; @@ -87,8 +89,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -138,7 +138,6 @@ import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; -import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; @@ -182,6 +181,9 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; @@ -207,12 +209,14 @@ import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.ReflectionUtil; +import org.objenesis.strategy.StdInstantiatorStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.FieldSerializer; -import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy; import com.google.common.base.Preconditions; /** @@ -1097,7 +1101,8 @@ public final class Utilities { // Kryo is not thread-safe, // Also new Kryo() is expensive, so we want to do it just once. - public static ThreadLocal<Kryo> runtimeSerializationKryo = new ThreadLocal<Kryo>() { + public static ThreadLocal<Kryo> + runtimeSerializationKryo = new ThreadLocal<Kryo>() { @Override protected Kryo initialValue() { Kryo kryo = new Kryo(); @@ -1105,10 +1110,22 @@ public final class Utilities { kryo.register(java.sql.Date.class, new SqlDateSerializer()); kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() ); + ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy( + new StdInstantiatorStrategy()); removeField(kryo, Operator.class, "colExprMap"); - removeField(kryo, ColumnInfo.class, "objectInspector"); removeField(kryo, AbstractOperatorDesc.class, "statistics"); + kryo.register(MapWork.class); + kryo.register(ReduceWork.class); + kryo.register(TableDesc.class); + kryo.register(UnionOperator.class); + kryo.register(FileSinkOperator.class); + kryo.register(HiveIgnoreKeyTextOutputFormat.class); + kryo.register(StandardConstantListObjectInspector.class); + kryo.register(StandardConstantMapObjectInspector.class); + kryo.register(StandardConstantStructObjectInspector.class); + kryo.register(SequenceFileInputFormat.class); + kryo.register(HiveSequenceFileOutputFormat.class); return kryo; }; }; @@ -1127,15 +1144,25 @@ public final class Utilities { kryo.register(java.sql.Date.class, new SqlDateSerializer()); kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() ); + ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); removeField(kryo, Operator.class, "colExprMap"); removeField(kryo, ColumnInfo.class, "objectInspector"); + removeField(kryo, AbstractOperatorDesc.class, "statistics"); kryo.register(SparkEdgeProperty.class); kryo.register(MapWork.class); kryo.register(ReduceWork.class); kryo.register(SparkWork.class); kryo.register(TableDesc.class); kryo.register(Pair.class); + kryo.register(UnionOperator.class); + kryo.register(FileSinkOperator.class); + kryo.register(HiveIgnoreKeyTextOutputFormat.class); + kryo.register(StandardConstantListObjectInspector.class); + kryo.register(StandardConstantMapObjectInspector.class); + kryo.register(StandardConstantStructObjectInspector.class); + kryo.register(SequenceFileInputFormat.class); + kryo.register(HiveSequenceFileOutputFormat.class); return kryo; }; }; @@ -1149,11 +1176,111 @@ public final class Utilities { kryo.register(java.sql.Date.class, new SqlDateSerializer()); kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() ); + ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()).setFallbackInstantiatorStrategy( + new StdInstantiatorStrategy()); + removeField(kryo, Operator.class, "colExprMap"); + removeField(kryo, AbstractOperatorDesc.class, "statistics"); + kryo.register(MapWork.class); + kryo.register(ReduceWork.class); + kryo.register(TableDesc.class); + kryo.register(UnionOperator.class); + kryo.register(FileSinkOperator.class); + kryo.register(HiveIgnoreKeyTextOutputFormat.class); + kryo.register(StandardConstantListObjectInspector.class); + kryo.register(StandardConstantMapObjectInspector.class); + kryo.register(StandardConstantStructObjectInspector.class); + kryo.register(SequenceFileInputFormat.class); + kryo.register(HiveSequenceFileOutputFormat.class); return kryo; }; }; + /** + * A kryo {@link Serializer} for lists created via {@link Arrays#asList(Object...)}. + * <p> + * Note: This serializer does not support cyclic references, so if one of the objects + * gets set the list as attribute this might cause an error during deserialization. + * </p> + * + * This is from kryo-serializers package. Added explicitly to avoid classpath issues. + */ + private static class ArraysAsListSerializer extends com.esotericsoftware.kryo.Serializer<List<?>> { + + private Field _arrayField; + + public ArraysAsListSerializer() { + try { + _arrayField = Class.forName( "java.util.Arrays$ArrayList" ).getDeclaredField( "a" ); + _arrayField.setAccessible( true ); + } catch ( final Exception e ) { + throw new RuntimeException( e ); + } + // Immutable causes #copy(obj) to return the original object + setImmutable(true); + } + + @Override + public List<?> read(final Kryo kryo, final Input input, final Class<List<?>> type) { + final int length = input.readInt(true); + Class<?> componentType = kryo.readClass( input ).getType(); + if (componentType.isPrimitive()) { + componentType = getPrimitiveWrapperClass(componentType); + } + try { + final Object items = Array.newInstance( componentType, length ); + for( int i = 0; i < length; i++ ) { + Array.set(items, i, kryo.readClassAndObject( input )); + } + return Arrays.asList( (Object[])items ); + } catch ( final Exception e ) { + throw new RuntimeException( e ); + } + } + + @Override + public void write(final Kryo kryo, final Output output, final List<?> obj) { + try { + final Object[] array = (Object[]) _arrayField.get( obj ); + output.writeInt(array.length, true); + final Class<?> componentType = array.getClass().getComponentType(); + kryo.writeClass( output, componentType ); + for( final Object item : array ) { + kryo.writeClassAndObject( output, item ); + } + } catch ( final RuntimeException e ) { + // Don't eat and wrap RuntimeExceptions because the ObjectBuffer.write... + // handles SerializationException specifically (resizing the buffer)... + throw e; + } catch ( final Exception e ) { + throw new RuntimeException( e ); + } + } + + private Class<?> getPrimitiveWrapperClass(final Class<?> c) { + if (c.isPrimitive()) { + if (c.equals(Long.TYPE)) { + return Long.class; + } else if (c.equals(Integer.TYPE)) { + return Integer.class; + } else if (c.equals(Double.TYPE)) { + return Double.class; + } else if (c.equals(Float.TYPE)) { + return Float.class; + } else if (c.equals(Boolean.TYPE)) { + return Boolean.class; + } else if (c.equals(Character.TYPE)) { + return Character.class; + } else if (c.equals(Short.TYPE)) { + return Short.class; + } else if (c.equals(Byte.TYPE)) { + return Byte.class; + } + } + return c; + } + } + public static TableDesc defaultTd; static { // by default we expect ^A separated strings http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index f4e5873..73e8f6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -64,8 +62,6 @@ import com.google.common.collect.Interner; @SuppressWarnings({"serial", "deprecation"}) public class MapWork extends BaseWork { - private static final Logger LOG = LoggerFactory.getLogger(MapWork.class); - // use LinkedHashMap to make sure the iteration order is // deterministic, to ease testing private LinkedHashMap<String, ArrayList<String>> pathToAliases = new LinkedHashMap<String, ArrayList<String>>(); @@ -548,17 +544,6 @@ public class MapWork extends BaseWork { } } - public void logPathToAliases() { - if (LOG.isDebugEnabled()) { - LOG.debug("LOGGING PATH TO ALIASES"); - for (Map.Entry<String, ArrayList<String>> entry: pathToAliases.entrySet()) { - for (String a: entry.getValue()) { - LOG.debug("Path: " + entry.getKey() + ", Alias: " + a); - } - } - } - } - public void setDummyTableScan(boolean dummyTableScan) { this.dummyTableScan = dummyTableScan; } http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java index 8211346..0ac625f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java @@ -23,11 +23,8 @@ import java.util.HashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorUtils; @@ -60,8 +57,6 @@ public class ReduceWork extends BaseWork { super(name); } - private static transient final Logger LOG = LoggerFactory.getLogger(ReduceWork.class); - // schema of the map-reduce 'key' object - this is homogeneous private TableDesc keyDesc; http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/spark-client/pom.xml ---------------------------------------------------------------------- diff --git a/spark-client/pom.xml b/spark-client/pom.xml index a0bbe56..9d2b418 100644 --- a/spark-client/pom.xml +++ b/spark-client/pom.xml @@ -39,8 +39,8 @@ <dependencies> <dependency> - <groupId>com.esotericsoftware.kryo</groupId> - <artifactId>kryo</artifactId> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> @@ -59,16 +59,20 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>commmons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> + <exclusions> + <exclusion> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/adbc0ab6/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java ---------------------------------------------------------------------- diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java index 197f113..9e789cf 100644 --- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java +++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/KryoMessageCodec.java @@ -23,19 +23,20 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import org.objenesis.strategy.StdInstantiatorStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.ByteBufferInputStream; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy; import com.google.common.base.Preconditions; + import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Codec that serializes / deserializes objects using Kryo. Objects are encoded with a 4-byte * header with the length of the serialized data. @@ -59,7 +60,7 @@ class KryoMessageCodec extends ByteToMessageCodec<Object> { kryo.register(klass, REG_ID_BASE + count); count++; } - kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); + kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); return kryo; } };