[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22855


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232401327
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
 }
   }
 
-  @transient
-  var pool: KryoPool = getPool
+  private class PoolWrapper extends KryoPool {
+private var pool: KryoPool = getPool
+
+override def borrow(): Kryo = pool.borrow()
+
+override def release(kryo: Kryo): Unit = pool.release(kryo)
+
+override def run[T](kryoCallback: KryoCallback[T]): T = 
pool.run(kryoCallback)
+
+def reset(): Unit = {
+  pool = getPool
--- End diff --

OK, and it only gets recreated when a new classloader is set, it seems, so 
it is rare. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232375758
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
 }
   }
 
-  @transient
-  var pool: KryoPool = getPool
+  private class PoolWrapper extends KryoPool {
+private var pool: KryoPool = getPool
+
+override def borrow(): Kryo = pool.borrow()
+
+override def release(kryo: Kryo): Unit = pool.release(kryo)
+
+override def run[T](kryoCallback: KryoCallback[T]): T = 
pool.run(kryoCallback)
+
+def reset(): Unit = {
+  pool = getPool
--- End diff --

The KryoPool interface exposes no way to free it, so I believe that is not 
necessary


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232332531
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
 }
   }
 
-  @transient
-  var pool: KryoPool = getPool
+  private class PoolWrapper extends KryoPool {
+private var pool: KryoPool = getPool
+
+override def borrow(): Kryo = pool.borrow()
+
+override def release(kryo: Kryo): Unit = pool.release(kryo)
+
+override def run[T](kryoCallback: KryoCallback[T]): T = 
pool.run(kryoCallback)
+
+def reset(): Unit = {
+  pool = getPool
--- End diff --

OK. We could look at updating to Kryo 5 in Spark 3, too, if there were 
other benefits.
Does the old pool need to get freed or anything here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232330079
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
--- End diff --

This was needed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232330015
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,36 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  private class PoolWrapper extends KryoPool {
--- End diff --

This wrapper can be removed when Kryo is updated to 5.0 as the new pool 
implementation exposes the needed method to clear the pool


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-09 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r232203634
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala 
---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Benchmark for KryoPool vs old "pool of 1".
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *  bin/spark-submit --class  --jars 
+ *   2. build/sbt "core/test:runMain "
+ *   3. generate result:
+ *  SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain 
"
+ *  Results will be written to 
"benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoSerializerBenchmark extends BenchmarkBase {
--- End diff --

cc @dongjoon-hyun for Benchmark change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-07 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231558355
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf)
 kryo
   }
 
+  override def setDefaultClassLoader(classLoader: ClassLoader): Serializer 
= {
+defaultClassLoader = Some(classLoader)
--- End diff --

defaultClassLoader is used in newKryo.

I called `getPool` after setting the defaultClassLoader to make sure we 
don't accidently create a newKryo before the defaultClassLoader is updated. 
Setting it on 105 was because I don't believe `setDefaultClassLoader` is 
required to be called.

The issue that I unfortunately didn't notice until these tests failed is 
that the tests specify that you can `setDefaultClassLoader` after serializing 
an object (I'm not sure this functionality is actually used), causing an 
"incorrect" Kryo instance to be in the pool. Unfortunately the pool doesn't 
expose a way to clear itself out, thus the var, which clearly doesn't work.

I will work on a solution


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-07 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231554649
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  @transient
--- End diff --

Yes, you are correct, I don't think this will work


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231526283
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  @transient
--- End diff --

Back on this item now, if this is no longer a lazy val, and it's transient, 
how does it get set again after this object itself is serialized? I'm mostly 
wondering if this is required now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231526925
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf)
 kryo
   }
 
+  override def setDefaultClassLoader(classLoader: ClassLoader): Serializer 
= {
+defaultClassLoader = Some(classLoader)
--- End diff --

You can write `super.setDefaultClassLoader(classLoader)` here to inherit 
the behavior rather than duplicate it. It is just one line now, yes.

Where is defaultClassLoader used in this implementation though? I wonder 
why it matters that you call `getPool` after this field is set. And if it does, 
isn't setting it in line 105 not actually helping?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231213606
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -298,30 +309,40 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+private[spark] class KryoSerializerInstance(
+   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
   extends SerializerInstance {
   /**
* A re-used [[Kryo]] instance. Methods will borrow this instance by 
calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence 
accesses to this field are
* not synchronized.
*/
-  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else 
borrowKryo()
 
   /**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a 
cached Kryo instance;
* otherwise, it allocates a new instance.
*/
   private[serializer] def borrowKryo(): Kryo = {
-if (cachedKryo != null) {
-  val kryo = cachedKryo
-  // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
-  // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+if (usePool) {
+  val kryo = ks.pool.borrow()
   kryo.reset()
-  cachedKryo = null
   kryo
 } else {
-  ks.newKryo()
+  if (cachedKryo != null) {
+val kryo = cachedKryo
+/**
+ * As a defensive measure, call reset() to clear any Kryo state 
that might have
--- End diff --

Another total nit, not worth touching unless you make other changes, but 
this is scaladoc-style. Multi-line comments are often just commented with `//` 
as before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-06 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r231213322
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -431,9 +434,11 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 ser.deserialize[HashMap[Int, List[String]]](serializedMap)
   }
 
-  private def testSerializerInstanceReuse(autoReset: Boolean, 
referenceTracking: Boolean): Unit = {
+  private def testSerializerInstanceReuse(
+ autoReset: Boolean, referenceTracking: Boolean, usePool: Boolean): 
Unit = {
--- End diff --

Total nit: there are a couple continuation indents that are 3 rather than 4 
spaces. Don't bother, unless you need to make other updates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-02 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r230422530
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

Sounds good and done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r230421073
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

Yeah I think that's a fine position to take, if we can't think of a reason 
to disable it other than the theoretical unknown unknown bug out there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-02 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r230420266
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

Okay, if I'm understanding you correctly I should set the default to true, 
and remove the documentation. I will go ahead and do that, if I misunderstood 
let me know. Thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-11-01 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r230052150
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

I would not document it. This is just a safety valve. In theory, there's no 
reason to disable this nor would a caller know why to disable it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229763892
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

I  had already added this to docs/configuration.md should I remove it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229763610
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 
   // Regression test for SPARK-7766, an issue where disabling auto-reset 
and enabling
   // reference-tracking would lead to corrupted output when serializer 
instances are re-used
-  for (referenceTracking <- Set(true, false); autoReset <- Set(true, 
false)) {
-test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking") {
-  testSerializerInstanceReuse(autoReset = autoReset, referenceTracking 
= referenceTracking)
+  for {
+referenceTracking <- Set(true, false)
--- End diff --

Might as well fix while I'm here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229762873
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
--- End diff --

Thanks, I'll remove, I had a problem previously but it looks like the 
@transient on the pool is enough itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229762474
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  @transient
+  lazy val pool = new KryoPool.Builder(factory).softReferences.build
--- End diff --

This is used directly by KryoSerializerInstance


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229738657
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 
   // Regression test for SPARK-7766, an issue where disabling auto-reset 
and enabling
   // reference-tracking would lead to corrupted output when serializer 
instances are re-used
-  for (referenceTracking <- Set(true, false); autoReset <- Set(true, 
false)) {
-test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking") {
-  testSerializerInstanceReuse(autoReset = autoReset, referenceTracking 
= referenceTracking)
+  for {
+referenceTracking <- Set(true, false)
+autoReset <- Set(true, false)
+usePool <- Set(true, false)
+  } {
+test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking" +
+  s", usePool = $usePool") {
+  testSerializerInstanceReuse(
+autoReset = autoReset, referenceTracking = referenceTracking, 
usePool = usePool)
+}
+  }
+
+  test("SPARK-25839 KryoPool implementation works correctly in 
multi-threaded environment") {
+import java.util.concurrent.Executors
--- End diff --

I'd import at the top of the file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229740163
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
   private val avroSchemas = conf.getAvroSchema
   // whether to use unsafe based IO for serialization
   private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
+  private val usePool = conf.getBoolean("spark.kryo.pool", false)
--- End diff --

Yep, I'd leave this undocumented


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229739685
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 
   // Regression test for SPARK-7766, an issue where disabling auto-reset 
and enabling
   // reference-tracking would lead to corrupted output when serializer 
instances are re-used
-  for (referenceTracking <- Set(true, false); autoReset <- Set(true, 
false)) {
-test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking") {
-  testSerializerInstanceReuse(autoReset = autoReset, referenceTracking 
= referenceTracking)
+  for {
+referenceTracking <- Set(true, false)
+autoReset <- Set(true, false)
+usePool <- Set(true, false)
+  } {
+test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking" +
+  s", usePool = $usePool") {
+  testSerializerInstanceReuse(
+autoReset = autoReset, referenceTracking = referenceTracking, 
usePool = usePool)
--- End diff --

Likewise i see this is just how the code was written before but the `foo = 
foo` style isn't adding anything IMHO. Feel free to not name args


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229738553
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 
   // Regression test for SPARK-7766, an issue where disabling auto-reset 
and enabling
   // reference-tracking would lead to corrupted output when serializer 
instances are re-used
-  for (referenceTracking <- Set(true, false); autoReset <- Set(true, 
false)) {
-test(s"instance reuse with autoReset = $autoReset, referenceTracking = 
$referenceTracking") {
-  testSerializerInstanceReuse(autoReset = autoReset, referenceTracking 
= referenceTracking)
+  for {
+referenceTracking <- Set(true, false)
--- End diff --

Not that it matters, but I think this should have originally been Seq not 
Set


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229737692
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
--- End diff --

I think you're welcome to just write ...
```
private lazy val factory = new KryoFactory() {
  override def create: Kryo = newKryo()
}
```
but it doesn't matter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229738794
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -30,6 +30,7 @@ import scala.util.control.NonFatal
 import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => 
KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
 import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, 
UnsafeOutput => KryoUnsafeOutput}
+import com.esotericsoftware.kryo.pool._
--- End diff --

I'd spell out the imports for clarity unless it's going to run more than 2 
lines or something


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-31 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r229737307
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+  @transient
+  private lazy val factory: KryoFactory = new KryoFactory() {
+override def create: Kryo = {
+  newKryo()
+}
+  }
+
+  @transient
+  lazy val pool = new KryoPool.Builder(factory).softReferences.build
--- End diff --

`private`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-29 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228970250
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -298,30 +312,40 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+private[spark] class KryoSerializerInstance(
+   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
   extends SerializerInstance {
   /**
* A re-used [[Kryo]] instance. Methods will borrow this instance by 
calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence 
accesses to this field are
* not synchronized.
*/
-  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else 
borrowKryo()
 
   /**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a 
cached Kryo instance;
* otherwise, it allocates a new instance.
*/
   private[serializer] def borrowKryo(): Kryo = {
-if (cachedKryo != null) {
-  val kryo = cachedKryo
-  // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
-  // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+if (usePool) {
+  val kryo = ks.pool.borrow()
   kryo.reset()
-  cachedKryo = null
   kryo
 } else {
-  ks.newKryo()
+  if (cachedKryo != null) {
+val kryo = cachedKryo
+/**
+* As a defensive measure, call reset() to clear any Kryo state 
that might have
--- End diff --

Thanks fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-29 Thread 10110346
Github user 10110346 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228844982
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -298,30 +312,40 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+private[spark] class KryoSerializerInstance(
+   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
   extends SerializerInstance {
   /**
* A re-used [[Kryo]] instance. Methods will borrow this instance by 
calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence 
accesses to this field are
* not synchronized.
*/
-  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else 
borrowKryo()
 
   /**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a 
cached Kryo instance;
* otherwise, it allocates a new instance.
*/
   private[serializer] def borrowKryo(): Kryo = {
-if (cachedKryo != null) {
-  val kryo = cachedKryo
-  // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
-  // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+if (usePool) {
+  val kryo = ks.pool.borrow()
   kryo.reset()
-  cachedKryo = null
   kryo
 } else {
-  ks.newKryo()
+  if (cachedKryo != null) {
+val kryo = cachedKryo
+/**
+* As a defensive measure, call reset() to clear any Kryo state 
that might have
--- End diff --

The `*`after the first line must be aligned with the first `*` of the first 
line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-28 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228759466
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -298,30 +312,40 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+private[spark] class KryoSerializerInstance(
+   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
   extends SerializerInstance {
   /**
* A re-used [[Kryo]] instance. Methods will borrow this instance by 
calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence 
accesses to this field are
* not synchronized.
*/
-  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else 
borrowKryo()
 
   /**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a 
cached Kryo instance;
* otherwise, it allocates a new instance.
*/
   private[serializer] def borrowKryo(): Kryo = {
-if (cachedKryo != null) {
-  val kryo = cachedKryo
-  // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
-  // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+if (usePool) {
+  val kryo = ks.pool.borrow()
   kryo.reset()
-  cachedKryo = null
   kryo
 } else {
-  ks.newKryo()
+  if (cachedKryo != null) {
+val kryo = cachedKryo
+/**
+* As a defensive measure, call reset() to clear any Kryo state 
that might have
--- End diff --

Sorry I'm new to this, what is the specific style issue here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-28 Thread patrickbrownsync
Github user patrickbrownsync commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228759425
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala 
---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Benchmark for KryoPool vs old "pool of 1".
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *  bin/spark-submit --class  --jars 
+ *   2. build/sbt "core/test:runMain "
+ *   3. generate result:
+ *  SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain 
"
+ *  Results will be written to 
"benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoSerializerBenchmark extends BenchmarkBase {
+
+  var sc: SparkContext = null
+  val N = 500
+  override def runBenchmarkSuite(): Unit = {
+val name = "Benchmark KryoPool vs old\"pool of 1\" implementation"
+runBenchmark(name) {
+  val benchmark = new Benchmark(name, N, 10, output = output)
+  Seq(true, false).foreach(usePool => run(usePool, benchmark))
+  benchmark.run()
+}
+  }
+
+  private def run(usePool: Boolean, benchmark: Benchmark): Unit = {
+lazy val sc = createSparkContext(usePool)
+
+benchmark.addCase(s"KryoPool:$usePool") { _ =>
+  val futures = for (_ <- 0 until N) yield {
+Future {
+  sc.parallelize(0 until 10).map(i => i + 1).count()
+}
+  }
+
+  val future = Future.sequence(futures)
+
+  ThreadUtils.awaitResult(future, 10.minutes)
+}
+  }
+
+  def createSparkContext(usePool: Boolean): SparkContext = {
--- End diff --

I'm not sure I understand the question here, this benchmark class doesn't 
inherit from `SqlBasedBenchmark` it inherits from `BenchmarkBase` which has no 
`getSparkSession` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228716995
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -298,30 +312,40 @@ class KryoDeserializationStream(
   }
 }
 
-private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: 
Boolean)
+private[spark] class KryoSerializerInstance(
+   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
   extends SerializerInstance {
   /**
* A re-used [[Kryo]] instance. Methods will borrow this instance by 
calling `borrowKryo()`, do
* their work, then release the instance by calling `releaseKryo()`. 
Logically, this is a caching
* pool of size one. SerializerInstances are not thread-safe, hence 
accesses to this field are
* not synchronized.
*/
-  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
+  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else 
borrowKryo()
 
   /**
* Borrows a [[Kryo]] instance. If possible, this tries to re-use a 
cached Kryo instance;
* otherwise, it allocates a new instance.
*/
   private[serializer] def borrowKryo(): Kryo = {
-if (cachedKryo != null) {
-  val kryo = cachedKryo
-  // As a defensive measure, call reset() to clear any Kryo state that 
might have been modified
-  // by the last operation to borrow this instance (see SPARK-7766 for 
discussion of this issue)
+if (usePool) {
+  val kryo = ks.pool.borrow()
   kryo.reset()
-  cachedKryo = null
   kryo
 } else {
-  ks.newKryo()
+  if (cachedKryo != null) {
+val kryo = cachedKryo
+/**
+* As a defensive measure, call reset() to clear any Kryo state 
that might have
--- End diff --

nit for the style.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228716497
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -92,6 +94,18 @@ class KryoSerializer(conf: SparkConf)
   new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
 }
 
+
--- End diff --

nit: extra empty line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228716824
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala 
---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.serializer.KryoTest._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Benchmark for KryoPool vs old "pool of 1".
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *  bin/spark-submit --class  --jars 
+ *   2. build/sbt "core/test:runMain "
+ *   3. generate result:
+ *  SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain 
"
+ *  Results will be written to 
"benchmarks/KryoSerializerBenchmark-results.txt".
+ * }}}
+ */
+object KryoSerializerBenchmark extends BenchmarkBase {
+
+  var sc: SparkContext = null
+  val N = 500
+  override def runBenchmarkSuite(): Unit = {
+val name = "Benchmark KryoPool vs old\"pool of 1\" implementation"
+runBenchmark(name) {
+  val benchmark = new Benchmark(name, N, 10, output = output)
+  Seq(true, false).foreach(usePool => run(usePool, benchmark))
+  benchmark.run()
+}
+  }
+
+  private def run(usePool: Boolean, benchmark: Benchmark): Unit = {
+lazy val sc = createSparkContext(usePool)
+
+benchmark.addCase(s"KryoPool:$usePool") { _ =>
+  val futures = for (_ <- 0 until N) yield {
+Future {
+  sc.parallelize(0 until 10).map(i => i + 1).count()
+}
+  }
+
+  val future = Future.sequence(futures)
+
+  ThreadUtils.awaitResult(future, 10.minutes)
+}
+  }
+
+  def createSparkContext(usePool: Boolean): SparkContext = {
--- End diff --

We add this but not override `getSparkSession` in `SqlBasedBenchmark`, is 
it because change conf in SparkSession doesn't work for SparkSession?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-27 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22855#discussion_r228716844
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -33,6 +33,7 @@ import org.apache.spark.serializer.KryoTest._
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
+
--- End diff --

nit: extra empty line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

2018-10-26 Thread patrickbrownsync
GitHub user patrickbrownsync opened a pull request:

https://github.com/apache/spark/pull/22855

[SPARK-25839] [Core] Implement use of KryoPool in KryoSerializer

## What changes were proposed in this pull request?

* Implement (optional) use of KryoPool in KryoSerializer, an alternative to 
the existing implementation of caching a Kryo instance inside 
KryoSerializerInstance
* Add config key & documentation of spark.kryo.pool in order to turn this on
* Add benchmark KryoSerializerBenchmark to compare new and old 
implementation
* Add results of benchmark

## How was this patch tested?

Added new tests inside KryoSerializerSuite to test the pool implementation 
as well as added the pool option to the existing regression testing for 
SPARK-7766


This is my original work and I license the work to the project under the 
project’s open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Blyncs/spark kryo-pool

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22855


commit ce5d13e0673f8574b5795d0d3df59da03118038a
Author: Patrick Brown 
Date:   2018-04-06T18:19:52Z

use kryo pool for kryo serializer

commit a4ba88eed6d18d2df5fab609bc28210df9b5a716
Author: Patrick Brown 
Date:   2018-04-09T21:20:16Z

fix pool serializable issue

commit 3f1c41ccc451af868a13616065676a5667d597fa
Author: Patrick Brown 
Date:   2018-10-26T18:52:07Z

Add option to KryoSerializer to use new KryoPool based implentation
Add tests for new implementation to KryoSerializerSuite.scala
Add benchmark new vs old implemtation in KryoSerializerBenchmark.scala
Add option in Benchmark base for afterAll() shutdown code to facilitate 
clean benchmark shutdown
Add documentation for spark.kryo.pool configuration option




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org