[ https://issues.apache.org/jira/browse/BEAM-5775?focusedWorklogId=217776&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-217776 ]
ASF GitHub Bot logged work on BEAM-5775: ---------------------------------------- Author: ASF GitHub Bot Created on: 25/Mar/19 01:40 Start Date: 25/Mar/19 01:40 Worklog Time Spent: 10m Work Description: mikekap commented on pull request #6714: [BEAM-5775] Spark: implement a custom class to lazily encode values for persistence. URL: https://github.com/apache/beam/pull/6714#discussion_r268467653 ########## File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ValueAndCoderKryoSerializable.java ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; + +/** + * A holder object that lets you serialize an element with a Coder with minimal wasted space. + * Supports both Kryo and Java serialization. + * + * <p>There are two different representations: a deserialized representation and a serialized + * representation. + * + * <p>The deserialized representation stores a Coder and the value. To serialize the value, we write + * a length-prefixed encoding of value, but do NOT write the Coder used. + * + * <p>The serialized representation just reads a byte array - the value is not deserialized fully. + * In order to get at the deserialized value, the caller must pass the Coder used to create this + * instance via getOrDecode(Coder). This reverts the representation back to the deserialized + * representation. + * + * @param <T> element type + */ +public class ValueAndCoderKryoSerializable<T> implements KryoSerializable, Externalizable { + private T value; + // Re-use a field to save space in-memory. This is either a byte[] or a Coder, depending on + // which representation we are in. + private Object coderOrBytes; + + ValueAndCoderKryoSerializable(T value, Coder<T> currentCoder) { + this.value = value; + this.coderOrBytes = currentCoder; + } + + @SuppressWarnings("unused") // for serialization + public ValueAndCoderKryoSerializable() {} + + public T getOrDecode(Coder<T> coder) throws IOException { + if (!(coderOrBytes instanceof Coder)) { + value = + coder.decode(new ByteArrayInputStream((byte[]) this.coderOrBytes), Coder.Context.OUTER); + this.coderOrBytes = coder; + } + + return value; + } + + private static class ByteSizeObserver extends ElementByteSizeObserver { + private long observedSize = 0; + + @Override + protected void reportElementSize(long elementByteSize) { + observedSize += elementByteSize; + } + } + + private void writeCommon(OutputStream out) throws IOException { + if (!(coderOrBytes instanceof Coder)) { + byte[] bytes = (byte[]) coderOrBytes; + VarInt.encode(bytes.length, out); + out.write(bytes); + } else { + @SuppressWarnings("unchecked") + Coder<T> coder = (Coder<T>) coderOrBytes; + int bufferSize = 1024; + + if (coder.isRegisterByteSizeObserverCheap(value)) { Review comment: I'm not 100% sure if this matters, but aside from the observer, this code doesn't actually ever produce a `byte[]` of the serialized output. Specifically, `ByteArrayOutputStream.writeTo` is used to avoid that. If the buffer inside `ByteArrayOutputStream` is bigger than the content, this allows us to avoid an array copy. To avoid the copy properly when returning bytes, Java has a `ByteBuffer`. I'd have to change `CoderHelpers` to use `ByteBuffer`s instead - and I'm not sure if all consumers really benefit from the unwieldy API that gives them. My suggestion in this case would be - if you want performance in Spark serialization, you should use `ValueAndCoderLazySerializable` directly. Otherwise, use `CoderHelpers` for explicit serialization/deserialization as needed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 217776) Time Spent: 3h 20m (was: 3h 10m) > Make the spark runner not serialize data unless spark is spilling to disk > ------------------------------------------------------------------------- > > Key: BEAM-5775 > URL: https://issues.apache.org/jira/browse/BEAM-5775 > Project: Beam > Issue Type: Improvement > Components: runner-spark > Reporter: Mike Kaplinskiy > Assignee: Mike Kaplinskiy > Priority: Minor > Labels: triaged > Time Spent: 3h 20m > Remaining Estimate: 0h > > Currently for storage level MEMORY_ONLY, Beam does not coder-ify the data. > This lets Spark keep the data in memory avoiding the serialization round > trip. Unfortunately the logic is fairly coarse - as soon as you switch to > MEMORY_AND_DISK, Beam coder-ifys the data even though Spark might have chosen > to keep the data in memory, incurring the serialization overhead. > > Ideally Beam would serialize the data lazily - as Spark chooses to spill to > disk. This would be a change in behavior when using beam, but luckily Spark > has a solution for folks that want data serialized in memory - > MEMORY_AND_DISK_SER will keep the data serialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)