Re: how to add columns to row when column has a different encoder?
Anyone know a way right now to do this? As best as I can tell I need a custom expression to pass to udf to do this. Just finished a protobuf encoder and it feels like expression is not meant to be public (good amount of things are private[sql]), am I wrong about this? Am I looking at the right interface to add such a UDF? Thanks for your help! On Mon, Feb 26, 2018, 3:50 PM David Capwell wrote: > I have a row that looks like the following pojo > > case class Wrapper(var id: String, var bytes: Array[Byte]) > > Those bytes are a serialized pojo that looks like this > > case class Inner(var stuff: String, var moreStuff: String) > > I right now have encoders for both the types, but I don't see how to merge > the two into a unified row that looks like the following > > > struct> > > If I know how to deserialize the bytes and have a encoder, how could I get > the above schema? I was looking at ds.withColumn("inner", ???) but wasn't > sure how to go from pojo + encoder to a column. Is there a better way to > do this? > > Thanks for your time reading this email >
Re: SizeEstimator
Thanks for the reply and sorry for my delayed response, had to go find the profile data to lookup the class again. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala That class extends SizeEstimator and has a field "map" which buffers the rows. In my case the buffer was > 1 million rows so became costly every time it was checked. This can be reproduced, create a random data set of (string, long), then group by string (I believe this is what the code did first, there was a sort later but should have been a different stage). Make sure number of executors is small (for example only one) else you are reducing the size of M for each executor. On Mon, Feb 26, 2018, 10:04 PM 叶先进 wrote: > What type is for the buffer you mentioned? > > > On 27 Feb 2018, at 11:46 AM, David Capwell wrote: > > advancedxy , I don't remember the code as well > anymore but what we hit was a very simple schema (string, long). The issue > is the buffer had a million of these so SizeEstimator of the buffer had to > keep recalculating the same elements over and over again. SizeEstimator > was on-cpu about 30% of the time, bounding the buffer got it to be < 5% > (going off memory so may be off). > > The class info(size of fields lay on heap) is cached for every occurred > class, so the size info of the same elements would not be recalculated. > However, for Collection class (or similar) SizeEstimator will scan all the > elements in the container (`next` field in LinkedList for example). > > And the array is a special case: SizeEstimator will sample array if > array.length > ARRAY_SIZE_FOR_SAMPLING(400). > > The cost is really (assuming memory is O(1) which is not true) O(N × M) > where N is number of rows in buffer and M is size of schema. My case could > be solved by not recomputing which would bring the cost to O(M) since > bookkeeping should be consistent time. There was logic to delay > recalculating bases off a change in frequency, but that didn't really do > much for us, bounding and spilling was the bigger win in our case. > > On Mon, Feb 26, 2018, 7:24 PM Xin Liu wrote: > >> Thanks David. Another solution is to convert the protobuf object to byte >> array, It does speed up SizeEstimator >> >> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell >> wrote: >> >>> This is used to predict the current cost of memory so spark knows to >>> flush or not. This is very costly for us so we use a flag marked in the >>> code as private to lower the cost >>> >>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no >>> typo) - how many records before flush >>> >>> This lowers the cost because it let's us leave data in young, if we >>> don't bound we get everyone promoted to old and GC becomes a issue. This >>> doesn't solve the fact that the walk is slow, but lowers the cost of GC. >>> For us we make sure to have spare memory on the system for page cache so >>> spilling to disk for us is a memory write 99% of the time. If your host >>> has less free memory spilling may become more expensive. >>> >>> >>> If the walk is your bottleneck and not GC then I would recommend JOL and >>> guessing to better predict memory. >>> >>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu wrote: >>> >>>> Hi folks, >>>> >>>> We have a situation where, shuffled data is protobuf based, and >>>> SizeEstimator is taking a lot of time. >>>> >>>> We have tried to override SizeEstimator to return a constant value, >>>> which speeds up things a lot. >>>> >>>> My questions, what is the side effect of disabling SizeEstimator? Is it >>>> just spark do memory reallocation, or there is more severe consequences? >>>> >>>> Thanks! >>>> >>> >> >
Re: SizeEstimator
advancedxy , I don't remember the code as well anymore but what we hit was a very simple schema (string, long). The issue is the buffer had a million of these so SizeEstimator of the buffer had to keep recalculating the same elements over and over again. SizeEstimator was on-cpu about 30% of the time, bounding the buffer got it to be < 5% (going off memory so may be off). The cost is really (assuming memory is O(1) which is not true) O(N × M) where N is number of rows in buffer and M is size of schema. My case could be solved by not recomputing which would bring the cost to O(M) since bookkeeping should be consistent time. There was logic to delay recalculating bases off a change in frequency, but that didn't really do much for us, bounding and spilling was the bigger win in our case. On Mon, Feb 26, 2018, 7:24 PM Xin Liu wrote: > Thanks David. Another solution is to convert the protobuf object to byte > array, It does speed up SizeEstimator > > On Mon, Feb 26, 2018 at 5:34 PM, David Capwell wrote: > >> This is used to predict the current cost of memory so spark knows to >> flush or not. This is very costly for us so we use a flag marked in the >> code as private to lower the cost >> >> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no >> typo) - how many records before flush >> >> This lowers the cost because it let's us leave data in young, if we don't >> bound we get everyone promoted to old and GC becomes a issue. This doesn't >> solve the fact that the walk is slow, but lowers the cost of GC. For us we >> make sure to have spare memory on the system for page cache so spilling to >> disk for us is a memory write 99% of the time. If your host has less free >> memory spilling may become more expensive. >> >> >> If the walk is your bottleneck and not GC then I would recommend JOL and >> guessing to better predict memory. >> >> On Mon, Feb 26, 2018, 4:47 PM Xin Liu wrote: >> >>> Hi folks, >>> >>> We have a situation where, shuffled data is protobuf based, and >>> SizeEstimator is taking a lot of time. >>> >>> We have tried to override SizeEstimator to return a constant value, >>> which speeds up things a lot. >>> >>> My questions, what is the side effect of disabling SizeEstimator? Is it >>> just spark do memory reallocation, or there is more severe consequences? >>> >>> Thanks! >>> >> >
Re: SizeEstimator
This is used to predict the current cost of memory so spark knows to flush or not. This is very costly for us so we use a flag marked in the code as private to lower the cost spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no typo) - how many records before flush This lowers the cost because it let's us leave data in young, if we don't bound we get everyone promoted to old and GC becomes a issue. This doesn't solve the fact that the walk is slow, but lowers the cost of GC. For us we make sure to have spare memory on the system for page cache so spilling to disk for us is a memory write 99% of the time. If your host has less free memory spilling may become more expensive. If the walk is your bottleneck and not GC then I would recommend JOL and guessing to better predict memory. On Mon, Feb 26, 2018, 4:47 PM Xin Liu wrote: > Hi folks, > > We have a situation where, shuffled data is protobuf based, and > SizeEstimator is taking a lot of time. > > We have tried to override SizeEstimator to return a constant value, which > speeds up things a lot. > > My questions, what is the side effect of disabling SizeEstimator? Is it > just spark do memory reallocation, or there is more severe consequences? > > Thanks! >
how to add columns to row when column has a different encoder?
I have a row that looks like the following pojo case class Wrapper(var id: String, var bytes: Array[Byte]) Those bytes are a serialized pojo that looks like this case class Inner(var stuff: String, var moreStuff: String) I right now have encoders for both the types, but I don't see how to merge the two into a unified row that looks like the following struct> If I know how to deserialize the bytes and have a encoder, how could I get the above schema? I was looking at ds.withColumn("inner", ???) but wasn't sure how to go from pojo + encoder to a column. Is there a better way to do this? Thanks for your time reading this email
Re: Encoder with empty bytes deserializes with non-empty bytes
Ok found my issue case c if c == classOf[ByteString] => StaticInvoke(classOf[Protobufs], ArrayType(ByteType), "fromByteString", parent :: Nil) Should be case c if c == classOf[ByteString] => StaticInvoke(classOf[Protobufs], BinaryType, "fromByteString", parent :: Nil) This causes the java code to see a byte[] which uses a different code path than linked. Since I did ArrayType(ByteTyep) I had to wrap the data in a ArrayData class On Wed, Feb 21, 2018 at 9:55 PM, David Capwell wrote: > I am trying to create a Encoder for protobuf data and noticed something > rather weird. When we have a empty ByteString (not null, just empty), when > we deserialize we get back a empty array of length 8. I took the generated > code and see something weird going on. > > UnsafeRowWriter > > >1. > >public void setOffsetAndSize(int ordinal, long currentCursor, long size) { > >2. > >final long relativeOffset = currentCursor - startingOffset; > >3. > >final long fieldOffset = getFieldOffset(ordinal); > >4. > >final long offsetAndSize = (relativeOffset << 32) | size; > >5. > >6. > >Platform.putLong(holder.buffer, fieldOffset, offsetAndSize); > >7. > > } > > > > So this takes the size of the array and stores it... but its not the array > size, its how many bytes were added > > rowWriter2.setOffsetAndSize(2, tmpCursor16, holder.cursor - tmpCursor16); > > > > So since the data is empty the only method that moves the cursor forward is > > arrayWriter1.initialize(holder, numElements1, 8); > > which does the following > > holder.cursor += (headerInBytes + fixedPartInBytes); > > in a debugger I see that headerInBytes = 8 and fixedPartInBytes = 0. > > Here is the header write > > >1. > >Platform.putLong(holder.buffer, startingOffset, numElements); > >2. > >for (int i = 8; i < headerInBytes; i += 8) { > >3. > > Platform.putLong(holder.buffer, startingOffset + i, 0L); > >4. > >} > > > > > Ok so so far this makes sense, in order to deserialize you need to know > about the data, so all good. Now to look at the deserialize path > > > UnsafeRow.java > > @Override > public byte[] getBinary(int ordinal) { > if (isNullAt(ordinal)) { > return null; > } else { > final long offsetAndSize = getLong(ordinal); > final int offset = (int) (offsetAndSize >> 32); > final int size = (int) offsetAndSize; > final byte[] bytes = new byte[size]; > Platform.copyMemory( > baseObject, > baseOffset + offset, > bytes, > Platform.BYTE_ARRAY_OFFSET, > size > ); > return bytes; > } > } > > > > Since this doesn't read the header to return the user-bytes, it tries to > return header + user-data. > > > > Is this expected? Am I supposed to filter out the header and force a > mem-copy to filter out for just the user-data? Since header appears to be > dynamic, how would I know the header length? > > Thanks for your time reading this email. > > > Spark version: spark_2.11-2.2.1 >
Encoder with empty bytes deserializes with non-empty bytes
I am trying to create a Encoder for protobuf data and noticed something rather weird. When we have a empty ByteString (not null, just empty), when we deserialize we get back a empty array of length 8. I took the generated code and see something weird going on. UnsafeRowWriter 1. public void setOffsetAndSize(int ordinal, long currentCursor, long size) { 2. final long relativeOffset = currentCursor - startingOffset; 3. final long fieldOffset = getFieldOffset(ordinal); 4. final long offsetAndSize = (relativeOffset << 32) | size; 5. 6. Platform.putLong(holder.buffer, fieldOffset, offsetAndSize); 7. } So this takes the size of the array and stores it... but its not the array size, its how many bytes were added rowWriter2.setOffsetAndSize(2, tmpCursor16, holder.cursor - tmpCursor16); So since the data is empty the only method that moves the cursor forward is arrayWriter1.initialize(holder, numElements1, 8); which does the following holder.cursor += (headerInBytes + fixedPartInBytes); in a debugger I see that headerInBytes = 8 and fixedPartInBytes = 0. Here is the header write 1. Platform.putLong(holder.buffer, startingOffset, numElements); 2. for (int i = 8; i < headerInBytes; i += 8) { 3. Platform.putLong(holder.buffer, startingOffset + i, 0L); 4. } Ok so so far this makes sense, in order to deserialize you need to know about the data, so all good. Now to look at the deserialize path UnsafeRow.java @Override public byte[] getBinary(int ordinal) { if (isNullAt(ordinal)) { return null; } else { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); final int size = (int) offsetAndSize; final byte[] bytes = new byte[size]; Platform.copyMemory( baseObject, baseOffset + offset, bytes, Platform.BYTE_ARRAY_OFFSET, size ); return bytes; } } Since this doesn't read the header to return the user-bytes, it tries to return header + user-data. Is this expected? Am I supposed to filter out the header and force a mem-copy to filter out for just the user-data? Since header appears to be dynamic, how would I know the header length? Thanks for your time reading this email. Spark version: spark_2.11-2.2.1
Dynamic Accumulators in 2.x?
I wrote a spark instrumentation tool that instruments RDDs to give more fine-grain details on what is going on within a Task. This is working right now, but uses volatiles and CAS to pass around this state (which slows down the task). We want to lower the overhead of this and make the main call path single-threaded and pass around the result when the task competes; which sounds like AccumulatorV2. I started rewriting the instrumented logic to be based off accumulators, but having a hard time getting these to show up in the UI/API (using this to see if I am linking things properly). So my question is as follows. When running in the executor and we create a accumulator (that was not created from SparkContext), how would I stitch things properly so it shows up with accumulators defined from the spark context? If this is different for different versions that is fine since we can figure that out quickly (hopefully) and change the instrumentation. Approches taken: Looked at SparkContext.register and copied the same logic, but at runtime this.hasNextTotal = new LongAccumulator(); this.hasNextTotal.metadata_$eq(new AccumulatorMetadata(AccumulatorContext.newId(), createName("hasNextTotal"), false)); AccumulatorContext.register(hasNextTotal); That didn't end up working tried getting the context from a SparkContext.getActive, but its not defined at runtime Option opt = SparkContext$.MODULE$.getActive(); if (opt.isDefined()) { SparkContext sc = opt.get(); hasNextTotal.register(sc, Option.apply("hasNext"), false); nextTotal.register(sc, Option.apply("next"), false); } Any help on this would be very helpful! would really rather not re-implement the wheel if I can piggy-back off Accumulators. Thanks for your help! Target spark version: 2.2.0
add jars to spark's runtime
We want to emit the metrics out of spark into our own custom store. To do this we built our own sink and tried to add it to spark by doing --jars path/to/jar and defining the class in metrics.properties which is supplied with the job. We noticed that spark kept crashing with the below exception 17/10/11 09:42:37 ERROR metrics.MetricsSystem: Sink class com.example.ExternalSink cannot be instantiated Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: java.lang.ClassNotFoundException: com.example.ExternalSink at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:230) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194) at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:366) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:223) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) ... 4 more We then added this jar into the spark tarball that we use for testing, and saw that it was able to load just fine and publish. My question is, how can I add this jar to the spark runtime rather than the user runtime? In production we don't have permissions to write to the jars dir of spark, so that trick to get this working won't work for us. Thanks for your time reading this email. Tested on spark 2.2.0