This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 439ac0d [BEAM-5092] Prevent hash-lookup of schema on every record (#6268) 439ac0d is described below commit 439ac0dcd13fbfcd77ae9ecd584f0e7e260fb778 Author: reuvenlax <re...@google.com> AuthorDate: Fri Aug 24 18:16:12 2018 -0700 [BEAM-5092] Prevent hash-lookup of schema on every record (#6268) * The hash lookup of the schema on every record is slowing down SchemaCoder. Better cache the getter list so this isn't necessary. * Properly handle nested schemas. * Make threadsafe. * Cache setter factory as well. * Fix unneeded validation. * Fix FindBugs warning. * Skip scanNullFields when not needed. * Switch coders to variable-length ones. --- .../java/org/apache/beam/sdk/coders/RowCoder.java | 4 +- .../apache/beam/sdk/coders/RowCoderGenerator.java | 26 +++-- .../sdk/schemas/GetterBasedSchemaProvider.java | 111 ++++++++++++++++----- .../main/java/org/apache/beam/sdk/values/Row.java | 5 +- 4 files changed, 112 insertions(+), 34 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index 47ced37..7e677a5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -43,8 +43,8 @@ public class RowCoder extends CustomCoder<Row> { .put(TypeName.BYTE, ByteCoder.of()) .put(TypeName.BYTES, ByteArrayCoder.of()) .put(TypeName.INT16, BigEndianShortCoder.of()) - .put(TypeName.INT32, BigEndianIntegerCoder.of()) - .put(TypeName.INT64, BigEndianLongCoder.of()) + .put(TypeName.INT32, VarIntCoder.of()) + .put(TypeName.INT64, VarLongCoder.of()) .put(TypeName.DECIMAL, BigDecimalCoder.of()) .put(TypeName.FLOAT, FloatCoder.of()) .put(TypeName.DOUBLE, DoubleCoder.of()) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java index b8fcb96..73430be 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java @@ -51,6 +51,7 @@ import net.bytebuddy.implementation.bytecode.member.MethodReturn; import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; import net.bytebuddy.matcher.ElementMatchers; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.values.Row; @@ -147,9 +148,12 @@ public abstract class RowCoderGenerator { private static DynamicType.Builder<Coder> implementMethods( Schema schema, DynamicType.Builder<Coder> builder) { + boolean hasNullableFields = schema.getFields().stream().anyMatch(Field::getNullable); return builder .defineMethod("getSchema", Schema.class, Visibility.PRIVATE, Ownership.STATIC) .intercept(FixedValue.reference(schema)) + .defineMethod("hasNullableFields", boolean.class, Visibility.PRIVATE, Ownership.STATIC) + .intercept(FixedValue.reference(hasNullableFields)) .method(ElementMatchers.named("encode")) .intercept(new EncodeInstruction()) .method(ElementMatchers.named("decode")) @@ -176,6 +180,13 @@ public abstract class RowCoderGenerator { MethodVariableAccess.REFERENCE.loadFrom(1), // OutputStream. MethodVariableAccess.REFERENCE.loadFrom(2), + // hasNullableFields + MethodInvocation.invoke( + implementationContext + .getInstrumentedType() + .getDeclaredMethods() + .filter(ElementMatchers.named("hasNullableFields")) + .getOnly()), // Call EncodeInstruction.encodeDelegate MethodInvocation.invoke( LOADED_TYPE @@ -197,9 +208,10 @@ public abstract class RowCoderGenerator { // The encode method of the generated Coder delegates to this method to evaluate all of the // per-field Coders. @SuppressWarnings("unchecked") - static void encodeDelegate(Coder[] coders, Row value, OutputStream outputStream) + static void encodeDelegate( + Coder[] coders, Row value, OutputStream outputStream, boolean hasNullableFields) throws IOException { - NULL_LIST_CODER.encode(scanNullFields(value), outputStream); + NULL_LIST_CODER.encode(scanNullFields(value, hasNullableFields), outputStream); for (int idx = 0; idx < value.getFieldCount(); ++idx) { Object fieldValue = value.getValue(idx); if (value.getValue(idx) != null) { @@ -210,11 +222,13 @@ public abstract class RowCoderGenerator { // Figure out which fields of the Row are null, and returns a BitSet. This allows us to save // on encoding each null field separately. - private static BitSet scanNullFields(Row row) { + private static BitSet scanNullFields(Row row, boolean hasNullableFields) { BitSet nullFields = new BitSet(row.getFieldCount()); - for (int idx = 0; idx < row.getFieldCount(); ++idx) { - if (row.getValue(idx) == null) { - nullFields.set(idx); + if (hasNullableFields) { + for (int idx = 0; idx < row.getFieldCount(); ++idx) { + if (row.getValue(idx) == null) { + nullFields.set(idx); + } } } return nullFields; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java index 7171878..4879fa9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java @@ -26,6 +26,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Type; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -54,27 +55,74 @@ public abstract class GetterBasedSchemaProvider implements SchemaProvider { // why is that Java reflection does not guarantee the order in which it returns fields and // methods, and these schemas are often based on reflective analysis of classes. Therefore it's // important to capture the schema once here, so all invocations of the toRowFunction see the - // same version of the schema. If schemaFor were to be called inside the function, different + // same version of the schema. If schemaFor were to be called inside the lambda below, different // workers would see different versions of the schema. Schema schema = schemaFor(typeDescriptor); - return o -> Row.withSchema(schema).withFieldValueGetters(fieldValueGetterFactory(), o).build(); + + // Since we know that this factory is always called from inside the lambda with the same schema, + // return a caching factory that caches the first value seen for each class. This prevents + // having to lookup the getter list each time createGetters is called. + FieldValueGetterFactory getterFactory = + new FieldValueGetterFactory() { + @Nullable + private transient ConcurrentHashMap<Class, List<FieldValueGetter>> gettersMap = null; + + private final FieldValueGetterFactory innerFactory = fieldValueGetterFactory(); + + @Override + public List<FieldValueGetter> createGetters(Class<?> targetClass, Schema schema) { + if (gettersMap == null) { + gettersMap = new ConcurrentHashMap<>(); + } + List<FieldValueGetter> getters = gettersMap.get(targetClass); + if (getters != null) { + return getters; + } + getters = innerFactory.createGetters(targetClass, schema); + gettersMap.put(targetClass, getters); + return getters; + } + }; + return o -> Row.withSchema(schema).withFieldValueGetters(getterFactory, o).build(); } @Override @SuppressWarnings("unchecked") public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) { + FieldValueSetterFactory setterFactory = + new FieldValueSetterFactory() { + @Nullable + private volatile ConcurrentHashMap<Class, List<FieldValueSetter>> settersMap = null; + + private final FieldValueSetterFactory innerFactory = fieldValueSetterFactory(); + + @Override + public List<FieldValueSetter> createSetters(Class<?> targetClass, Schema schema) { + if (settersMap == null) { + settersMap = new ConcurrentHashMap<>(); + } + List<FieldValueSetter> setters = settersMap.get(targetClass); + if (setters != null) { + return setters; + } + setters = innerFactory.createSetters(targetClass, schema); + settersMap.put(targetClass, setters); + return setters; + } + }; + return r -> { if (r instanceof RowWithGetters) { // Efficient path: simply extract the underlying POJO instead of creating a new one. return (T) ((RowWithGetters) r).getGetterTarget(); } else { // Use the setters to copy values from the Row to a new instance of the class. - return fromRow(r, (Class<T>) typeDescriptor.getType()); + return fromRow(r, (Class<T>) typeDescriptor.getType(), setterFactory); } }; } - private <T> T fromRow(Row row, Class<T> clazz) { + private <T> T fromRow(Row row, Class<T> clazz, FieldValueSetterFactory setterFactory) { T object; try { object = clazz.getDeclaredConstructor().newInstance(); @@ -86,7 +134,7 @@ public abstract class GetterBasedSchemaProvider implements SchemaProvider { } Schema schema = row.getSchema(); - List<FieldValueSetter> setters = fieldValueSetterFactory().createSetters(clazz, schema); + List<FieldValueSetter> setters = setterFactory.createSetters(clazz, schema); checkState( setters.size() == row.getFieldCount(), "Did not have a matching number of setters and fields."); @@ -96,15 +144,6 @@ public abstract class GetterBasedSchemaProvider implements SchemaProvider { for (int i = 0; i < row.getFieldCount(); ++i) { FieldType type = schema.getField(i).getType(); FieldValueSetter setter = setters.get(i); - if (setter == null) { - throw new RuntimeException( - "NULL SETTER FOR " - + clazz.getSimpleName() - + " field name " - + schema.getField(i).getName() - + " schema " - + schema); - } setter.set( object, fromValue( @@ -113,7 +152,8 @@ public abstract class GetterBasedSchemaProvider implements SchemaProvider { setter.type(), setter.elementType(), setter.mapKeyType(), - setter.mapValueType())); + setter.mapValueType(), + setterFactory)); } return object; } @@ -121,39 +161,62 @@ public abstract class GetterBasedSchemaProvider implements SchemaProvider { @SuppressWarnings("unchecked") @Nullable private <T> T fromValue( - FieldType type, T value, Type fieldType, Type elemenentType, Type keyType, Type valueType) { + FieldType type, + T value, + Type fieldType, + Type elemenentType, + Type keyType, + Type valueType, + FieldValueSetterFactory setterFactory) { if (value == null) { return null; } if (TypeName.ROW.equals(type.getTypeName())) { - return (T) fromRow((Row) value, (Class) fieldType); + return (T) fromRow((Row) value, (Class) fieldType, setterFactory); } else if (TypeName.ARRAY.equals(type.getTypeName())) { - return (T) fromListValue(type.getCollectionElementType(), (List) value, elemenentType); + return (T) + fromListValue( + type.getCollectionElementType(), (List) value, elemenentType, setterFactory); } else if (TypeName.MAP.equals(type.getTypeName())) { return (T) fromMapValue( - type.getMapKeyType(), type.getMapValueType(), (Map) value, keyType, valueType); + type.getMapKeyType(), + type.getMapValueType(), + (Map) value, + keyType, + valueType, + setterFactory); } else { return value; } } @SuppressWarnings("unchecked") - private <T> List fromListValue(FieldType elementType, List<T> rowList, Type elementClass) { + private <T> List fromListValue( + FieldType elementType, + List<T> rowList, + Type elementClass, + FieldValueSetterFactory setterFactory) { List list = Lists.newArrayList(); for (T element : rowList) { - list.add(fromValue(elementType, element, elementClass, null, null, null)); + list.add(fromValue(elementType, element, elementClass, null, null, null, setterFactory)); } return list; } @SuppressWarnings("unchecked") private Map<?, ?> fromMapValue( - FieldType keyType, FieldType valueType, Map<?, ?> map, Type keyClass, Type valueClass) { + FieldType keyType, + FieldType valueType, + Map<?, ?> map, + Type keyClass, + Type valueClass, + FieldValueSetterFactory setterFactory) { Map newMap = Maps.newHashMap(); for (Map.Entry<?, ?> entry : map.entrySet()) { - Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, null); - Object value = fromValue(valueType, entry.getValue(), valueClass, null, null, null); + Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, null, setterFactory); + Object value = + fromValue(valueType, entry.getValue(), valueClass, null, null, null, setterFactory); newMap.put(key, value); } return newMap; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index d745df7..2c12a7f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -378,7 +378,8 @@ public abstract class Row implements Serializable { public Builder attachValues(List<Object> values) { this.attached = true; - return addValues(values); + this.values = values; + return this; } public Builder withFieldValueGetters( @@ -564,7 +565,7 @@ public abstract class Row implements Serializable { if (!this.values.isEmpty()) { List<Object> storageValues = attached ? this.values : verify(schema, this.values); checkState(getterTarget == null, "withGetterTarget requires getters."); - return new RowWithStorage(schema, verify(schema, storageValues)); + return new RowWithStorage(schema, storageValues); } else if (fieldValueGetterFactory != null) { checkState(getterTarget != null, "getters require withGetterTarget."); return new RowWithGetters(schema, fieldValueGetterFactory, getterTarget);