[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35784695
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -23,6 +23,8 @@ import javax.annotation.Nullable
 
 import scala.reflect.ClassTag
 
+import org.apache.avro.generic.{GenericData, GenericRecord}
--- End diff --

nit: order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126021604
  
Thanks @JDrit !  I will fix those final details as I merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35784052
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
--- End diff --

Joe and I talked about this a bit offline -- the reason for this is that 
`ShuffleRDD` lets you set a Serializer directly, which is used in some tests, 
and that is why the serializer itself needs to be serializable.  I'll add a 
comment here explaining why its necessary when I merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35784682
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.{SparkException, SparkEnv}
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ * @param schemas a map where the keys are unique IDs for Avro schemas and 
the values are the
+ *string representation of the Avro schema, used to 
decrease the amount of data
+ *that needs to be serialized.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val bos = new ByteArrayOutputStream()
+val out = codec.compressedOutputStream(bos)
+out.write(schema.toString.getBytes(UTF-8))
+out.close()
+bos.toByteArray
+  })
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val bis = new ByteArrayInputStream(schemaBytes.array())
+val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
+new Schema.Parser().parse(new String(bytes, UTF-8))
+  })
+
+  /**
+   * Serializes a record to the given output stream. It caches a lot of 
the internal data as
+   * to not redo work
+   */
+  def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): 
Unit = {
+val encoder = EncoderFactory.get.binaryEncoder(output, null)
+val schema = datum.getSchema
+val fingerprint = 

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35796975
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
--- End diff --

for anybody that is curious -- I had gotten myself pretty confused about 
why this change would make the `SparkConf` serialized.  `KryoSerializer` 
already had a `conf` argument to the constructor, that wasn't changed.  But the 
`conf` there is only accessed in field initialization, never in methods, so it 
wasn't stored.  But through the wonders of scala, when you access that `conf` 
in a method, suddenly `conf` also becomes a member variable, and now you can no 
longer serialize the `KryoSerializer`.

In practice this means the `lazy val codec` here is fine in actual use, but 
it could be very confusing in a unit test where the `SparkEnv` hasn't been set. 
 So I'll add comment explaining this a bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126088128
  
Re: the avro dependency, this is a net new dependency for core. Previously 
this came in via the hive module (the metastore dependency to be specific). I 
suppose it relies on the Hive profile therefore, but not the YARN profile.

In any event the right thing to do is include the dependency if it's being 
used, of course. I suppose this is evidence that the Spark assembly -- the Hive 
flavors -- have had this dep and have been fine.

Avro doesn't bring anything in that we didn't already have, except Avro:

```
[INFO] +- org.apache.avro:avro-mapred:jar:hadoop2:1.7.7:compile
[INFO] |  +- org.apache.avro:avro-ipc:jar:1.7.7:compile
[INFO] |  |  +- (org.apache.avro:avro:jar:1.7.7:compile - version managed 
from 1.7.5; omitted for duplicate)
[INFO] |  |  +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - 
version managed from 1.9.2; omitted for duplicate)
[INFO] |  |  +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile 
- version managed from 1.9.2; omitted for duplicate)
[INFO] |  |  \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed 
from 1.6.4; scope managed from runtime; omitted for duplicate)
[INFO] |  +- org.apache.avro:avro-ipc:jar:tests:1.7.7:compile
[INFO] |  |  +- (org.apache.avro:avro:jar:1.7.7:compile - version managed 
from 1.7.5; omitted for duplicate)
[INFO] |  |  +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - 
version managed from 1.9.2; omitted for duplicate)
[INFO] |  |  +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile 
- version managed from 1.9.2; omitted for duplicate)
[INFO] |  |  \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed 
from 1.6.4; scope managed from runtime; omitted for duplicate)
[INFO] |  +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - 
version managed from 1.9.2; omitted for duplicate)
[INFO] |  +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - 
version managed from 1.9.2; omitted for duplicate)
[INFO] |  \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed from 
1.6.4; scope managed from runtime; omitted for duplicate)
```

So, I think the net change here is only that Avro has been added to core. 
Unless there's an objection to adding Avro at all, I think this is OK from a 
build standpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126112201
  
Okay sounds good. Thanks for looking at it Sean.

- Patrick

On Wed, Jul 29, 2015 at 1:37 PM, Sean Owen notificati...@github.com wrote:

 Re: the avro dependency, this is a net new dependency for core. Previously
 this came in via the hive module (the metastore dependency to be 
specific).
 I suppose it relies on the Hive profile therefore, but not the YARN 
profile.

 In any event the right thing to do is include the dependency if it's being
 used, of course. I suppose this is evidence that the Spark assembly -- the
 Hive flavors -- have had this dep and have been fine.

 Avro doesn't bring anything in that we didn't already have, except Avro:

 [INFO] +- org.apache.avro:avro-mapred:jar:hadoop2:1.7.7:compile
 [INFO] |  +- org.apache.avro:avro-ipc:jar:1.7.7:compile
 [INFO] |  |  +- (org.apache.avro:avro:jar:1.7.7:compile - version managed 
from 1.7.5; omitted for duplicate)
 [INFO] |  |  +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile 
- version managed from 1.9.2; omitted for duplicate)
 [INFO] |  |  +- 
(org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed 
from 1.9.2; omitted for duplicate)
 [INFO] |  |  \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed 
from 1.6.4; scope managed from runtime; omitted for duplicate)
 [INFO] |  +- org.apache.avro:avro-ipc:jar:tests:1.7.7:compile
 [INFO] |  |  +- (org.apache.avro:avro:jar:1.7.7:compile - version managed 
from 1.7.5; omitted for duplicate)
 [INFO] |  |  +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile 
- version managed from 1.9.2; omitted for duplicate)
 [INFO] |  |  +- 
(org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed 
from 1.9.2; omitted for duplicate)
 [INFO] |  |  \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed 
from 1.6.4; scope managed from runtime; omitted for duplicate)
 [INFO] |  +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - 
version managed from 1.9.2; omitted for duplicate)
 [INFO] |  +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile 
- version managed from 1.9.2; omitted for duplicate)
 [INFO] |  \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed 
from 1.6.4; scope managed from runtime; omitted for duplicate)

 So, I think the net change here is only that Avro has been added to core.
 Unless there's an objection to adding Avro at all, I think this is OK from
 a build standpoint.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/7004#issuecomment-126088128.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126040715
  
sorry I am trying to understand this serialization thing a bit better ... 
something doesn't make sense to me, but mostly outside of these changes.  Once 
I get a handle on that, I will merge this, just want to make sure I'm not 
missing something ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126063730
  
Did @srowen look at the build change? Sean or I should be signing off on 
any dependency changes in the build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126084533
  
@pwendell @JoshRosen  ah sorry, I thought @vanzin  @zsxwing gave it the 
thumbs up earlier -- just following the comments I forgot to get the approval 
of sean or you as well.  Lemme know if you think this needs an immediate 
revert.  I will also check w/ sean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126065124
  
There was a lot of up-thread discussion regarding the dependency change; my 
first comment on this PR was asking whether this introduced a new dependency, 
etc.

One concern which might have been overlooked is whether the required 
dependencies will be packaged if Spark is build with the YARN profile disabled. 
It looks like the rationale upthread is that this dependency is already 
transitively included by our other dependencies, but I'm not sure if that will 
always be the case if we're relying on the YARN profile to include it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-126060583
  
merged to master, thanks @JDrit 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/7004


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-27 Thread JDrit
Github user JDrit commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-125257800
  
@squito @JoshRosen Have I addressed all your issues, or is there anything 
else you would like me to do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-27 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-125279434
  
I'm going to defer to other reviewers who have been following this patch 
more closely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-124537897
  
  [Test build #1196 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1196/consoleFull)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-124573048
  
  [Test build #1196 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1196/console)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35261143
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
--- End diff --

just curious, where does this constructed when there isn't a SparkEnv?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35266023
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
--- End diff --

See above, but I have also updated the comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123875296
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123875261
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123940828
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123940810
  
  [Test build #38137 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38137/console)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123956225
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123956618
  
  [Test build #38157 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38157/consoleFull)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123956172
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123955693
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123958303
  
  [Test build #38157 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38157/console)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123958311
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35262959
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 this
   }
 
+  private final val avroNamespace = avro.schema.
+
+  /**
+   * Use Kryo serialization and register the given set of Avro schemas so 
that the generic
+   * record serializer can decrease network IO
+   */
+  def registerAvroSchemas(schemas: Schema*): SparkConf = {
+schemas.foldLeft(this) { (conf, schema) =
+  conf.set(avroNamespace + 
SchemaNormalization.parsingFingerprint64(schema), schema.toString)
+}
+  }
+
+  /** Gets all the avro schemas in the configuration used in the generic 
Avro record serializer */
+  def getAvroSchema: Map[Long, String] = {
--- End diff --

What do the keys and values of this map denote?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35261569
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val bos = new ByteArrayOutputStream()
+val out = codec.compressedOutputStream(bos)
+out.write(schema.toString.getBytes(UTF-8))
+out.close()
+bos.toByteArray
+  })
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val bis = new ByteArrayInputStream(schemaBytes.array())
+val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
+new Schema.Parser().parse(new String(bytes, UTF-8))
+  })
+
+  /**
+   * Serializes a record to the given output stream. It caches a lot of 
the internal data as
+   * to not redo work
+   */
+  def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): 
Unit = {
+val encoder = EncoderFactory.get.binaryEncoder(output, null)
+val schema = datum.getSchema
+val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
+  SchemaNormalization.parsingFingerprint64(schema)
+})
+schemas.get(fingerprint) match {
+  case Some(_) =
+output.writeBoolean(true)
+output.writeLong(fingerprint)
+  

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123857175
  
2 super minor comments, but otherwise lgtm!

Also as a bit general house-keeping, can you put some of the benchmarks you 
have on the jira (since that serves as a better archive than github).

@JoshRosen want to take another look at this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35262721
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
--- End diff --

Why not just accept a `SparkConf` in the `GenericAvroSerializer` 
constructor instead of getting it from `SparkEnv`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35263041
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 this
   }
 
+  private final val avroNamespace = avro.schema.
+
+  /**
+   * Use Kryo serialization and register the given set of Avro schemas so 
that the generic
+   * record serializer can decrease network IO
+   */
+  def registerAvroSchemas(schemas: Schema*): SparkConf = {
+schemas.foldLeft(this) { (conf, schema) =
--- End diff --

Why do you need a fold here?  This seems confusing.  Why not just loop over 
the schemas and call `conf.set()` for each?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35263138
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
--- End diff --

Similar comment here: what are the types in `schemas`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35263784
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val bos = new ByteArrayOutputStream()
+val out = codec.compressedOutputStream(bos)
+out.write(schema.toString.getBytes(UTF-8))
+out.close()
+bos.toByteArray
+  })
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val bis = new ByteArrayInputStream(schemaBytes.array())
+val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
+new Schema.Parser().parse(new String(bytes, UTF-8))
+  })
+
+  /**
+   * Serializes a record to the given output stream. It caches a lot of 
the internal data as
+   * to not redo work
+   */
+  def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): 
Unit = {
+val encoder = EncoderFactory.get.binaryEncoder(output, null)
+val schema = datum.getSchema
+val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
+  SchemaNormalization.parsingFingerprint64(schema)
+})
+schemas.get(fingerprint) match {
+  case Some(_) =
+output.writeBoolean(true)
+output.writeLong(fingerprint)
+   

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35265166
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
--- End diff --

I originally had it accept SparkConf but I was getting serialization errors 
since SparkConf is not serializable .  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123876150
  
  [Test build #38114 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38114/consoleFull)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35263272
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val bos = new ByteArrayOutputStream()
+val out = codec.compressedOutputStream(bos)
+out.write(schema.toString.getBytes(UTF-8))
+out.close()
+bos.toByteArray
+  })
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val bis = new ByteArrayInputStream(schemaBytes.array())
+val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
+new Schema.Parser().parse(new String(bytes, UTF-8))
+  })
+
+  /**
+   * Serializes a record to the given output stream. It caches a lot of 
the internal data as
+   * to not redo work
+   */
+  def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): 
Unit = {
+val encoder = EncoderFactory.get.binaryEncoder(output, null)
+val schema = datum.getSchema
+val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
+  SchemaNormalization.parsingFingerprint64(schema)
+})
+schemas.get(fingerprint) match {
+  case Some(_) =
+output.writeBoolean(true)
+output.writeLong(fingerprint)

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35263954
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
--- End diff --

Several of the tests involving block replication fail when this value is 
not lazily defined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35265438
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.commons.io.IOUtils
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.io.CompressionCodec
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
+  extends KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive so this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  /** This needs to be lazy since SparkEnv is not initialized yet 
sometimes when this is called */
+  private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val bos = new ByteArrayOutputStream()
+val out = codec.compressedOutputStream(bos)
+out.write(schema.toString.getBytes(UTF-8))
+out.close()
+bos.toByteArray
+  })
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val bis = new ByteArrayInputStream(schemaBytes.array())
+val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis))
+new Schema.Parser().parse(new String(bytes, UTF-8))
+  })
+
+  /**
+   * Serializes a record to the given output stream. It caches a lot of 
the internal data as
+   * to not redo work
+   */
+  def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): 
Unit = {
+val encoder = EncoderFactory.get.binaryEncoder(output, null)
+val schema = datum.getSchema
+val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
+  SchemaNormalization.parsingFingerprint64(schema)
+})
+schemas.get(fingerprint) match {
+  case Some(_) =
+output.writeBoolean(true)
+output.writeLong(fingerprint)
+   

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35265806
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 this
   }
 
+  private final val avroNamespace = avro.schema.
+
+  /**
+   * Use Kryo serialization and register the given set of Avro schemas so 
that the generic
+   * record serializer can decrease network IO
+   */
+  def registerAvroSchemas(schemas: Schema*): SparkConf = {
+schemas.foldLeft(this) { (conf, schema) =
--- End diff --

Got it, changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r35265695
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 this
   }
 
+  private final val avroNamespace = avro.schema.
+
+  /**
+   * Use Kryo serialization and register the given set of Avro schemas so 
that the generic
+   * record serializer can decrease network IO
+   */
+  def registerAvroSchemas(schemas: Schema*): SparkConf = {
+schemas.foldLeft(this) { (conf, schema) =
+  conf.set(avroNamespace + 
SchemaNormalization.parsingFingerprint64(schema), schema.toString)
+}
+  }
+
+  /** Gets all the avro schemas in the configuration used in the generic 
Avro record serializer */
+  def getAvroSchema: Map[Long, String] = {
--- End diff --

The keys are longs, which represent the a unique ID of the schema and the 
values are the string representation of the schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123919144
  
  [Test build #38137 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38137/consoleFull)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123909575
  
  [Test build #38114 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38114/console)
 for   PR 7004 at commit 
[`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123909597
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123919007
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123918991
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-22 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-123918916
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-122025940
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-122025951
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-122026253
  
  [Test build #37523 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37523/consoleFull)
 for   PR 7004 at commit 
[`c0cf329`](https://github.com/apache/spark/commit/c0cf32988d5c77655c09e9c798bbb49cb8b68250).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-122059098
  
  [Test build #37523 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37523/console)
 for   PR 7004 at commit 
[`c0cf329`](https://github.com/apache/spark/commit/c0cf32988d5c77655c09e9c798bbb49cb8b68250).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-122059291
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread JDrit
Github user JDrit commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34723675
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.ByteBuffer
+
+import org.apache.commons.io.IOUtils
+import org.apache.spark.io.CompressionCodec
+import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.{Schema, SchemaNormalization}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+
+/**
+ *
+ */
+private[spark] object GenericAvroSerializer {
--- End diff --

Since those constants were only be used in SparkConf, I just moved the 
entire object there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121745318
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121745295
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121743717
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121743738
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121745543
  
  [Test build #37405 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37405/consoleFull)
 for   PR 7004 at commit 
[`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121751252
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121751318
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121750826
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121751885
  
  [Test build #37408 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37408/consoleFull)
 for   PR 7004 at commit 
[`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121743981
  
  [Test build #37404 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37404/consoleFull)
 for   PR 7004 at commit 
[`c5fe794`](https://github.com/apache/spark/commit/c5fe79416f2dfd040514f6ef7f0852cd174db466).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121749529
  
  [Test build #37405 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37405/console)
 for   PR 7004 at commit 
[`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121749603
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121774720
  
  [Test build #37408 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37408/console)
 for   PR 7004 at commit 
[`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121774824
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121777631
  
**[Test build #37404 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37404/console)**
 for PR 7004 at commit 
[`c5fe794`](https://github.com/apache/spark/commit/c5fe79416f2dfd040514f6ef7f0852cd174db466)
 after a configured wait of `175m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-12135
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121789774
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121789744
  
  [Test build #37417 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37417/console)
 for   PR 7004 at commit 
[`dd71efe`](https://github.com/apache/spark/commit/dd71efeb302a2009c448e1c5a030daf38f08b322).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121764074
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121767183
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121767164
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121767711
  
  [Test build #37417 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37417/consoleFull)
 for   PR 7004 at commit 
[`dd71efe`](https://github.com/apache/spark/commit/dd71efeb302a2009c448e1c5a030daf38f08b322).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121760803
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-15 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121760780
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581021
  
--- Diff: core/pom.xml ---
@@ -35,6 +35,16 @@
   urlhttp://spark.apache.org//url
   dependencies
 dependency
+  groupIdnet.sf.py4j/groupId
+  artifactIdpy4j/artifactId
+  version0.8.2.1/version
+/dependency
--- End diff --

is there any reason to move this?  if not, leave it in the old location, so 
its easier to understand where it got added w/ git blame etc. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581606
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = avro.schema.
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
--- End diff --

typo: so


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581486
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
--- End diff --

`private[serializer]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34583682
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = avro.schema.
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val deflater = new Deflater(Deflater.BEST_COMPRESSION)
+val schemaBytes = schema.toString.getBytes(UTF-8)
+deflater.setInput(schemaBytes)
+deflater.finish()
+val buffer = Array.ofDim[Byte](schemaBytes.length)
+val outputStream = new ByteArrayOutputStream(schemaBytes.length)
+while(!deflater.finished()) {
+  val count = deflater.deflate(buffer)
+  outputStream.write(buffer, 0, count)
+}
+outputStream.close()
+outputStream.toByteArray
+  })
+
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val inflater = new Inflater()
+val bytes = schemaBytes.array()
+inflater.setInput(bytes)
+val outputStream = new ByteArrayOutputStream(bytes.length)
+val tmpBuffer = Array.ofDim[Byte](1024)
+while (!inflater.finished()) {
+  val count = inflater.inflate(tmpBuffer)
+  outputStream.write(tmpBuffer, 0, count)
+}
+inflater.end()
+outputStream.close()
+new Schema.Parser().parse(new String(outputStream.toByteArray, 
UTF-8))
+  })
+
+  /**
+   * 

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34584076
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = avro.schema.
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val deflater = new Deflater(Deflater.BEST_COMPRESSION)
+val schemaBytes = schema.toString.getBytes(UTF-8)
+deflater.setInput(schemaBytes)
+deflater.finish()
+val buffer = Array.ofDim[Byte](schemaBytes.length)
+val outputStream = new ByteArrayOutputStream(schemaBytes.length)
+while(!deflater.finished()) {
+  val count = deflater.deflate(buffer)
+  outputStream.write(buffer, 0, count)
+}
+outputStream.close()
+outputStream.toByteArray
+  })
+
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val inflater = new Inflater()
+val bytes = schemaBytes.array()
+inflater.setInput(bytes)
+val outputStream = new ByteArrayOutputStream(bytes.length)
+val tmpBuffer = Array.ofDim[Byte](1024)
+while (!inflater.finished()) {
+  val count = inflater.inflate(tmpBuffer)
+  outputStream.write(tmpBuffer, 0, count)
+}
+inflater.end()
+outputStream.close()
+new Schema.Parser().parse(new String(outputStream.toByteArray, 
UTF-8))
+  })
+
+  /**
+   * 

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34584417
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = avro.schema.
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val deflater = new Deflater(Deflater.BEST_COMPRESSION)
+val schemaBytes = schema.toString.getBytes(UTF-8)
+deflater.setInput(schemaBytes)
+deflater.finish()
+val buffer = Array.ofDim[Byte](schemaBytes.length)
+val outputStream = new ByteArrayOutputStream(schemaBytes.length)
+while(!deflater.finished()) {
+  val count = deflater.deflate(buffer)
+  outputStream.write(buffer, 0, count)
+}
+outputStream.close()
+outputStream.toByteArray
+  })
+
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val inflater = new Inflater()
+val bytes = schemaBytes.array()
+inflater.setInput(bytes)
+val outputStream = new ByteArrayOutputStream(bytes.length)
+val tmpBuffer = Array.ofDim[Byte](1024)
+while (!inflater.finished()) {
+  val count = inflater.inflate(tmpBuffer)
+  outputStream.write(tmpBuffer, 0, count)
+}
+inflater.end()
+outputStream.close()
+new Schema.Parser().parse(new String(outputStream.toByteArray, 
UTF-8))
+  })
+
+  /**
+   * 

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34585605
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -160,6 +162,21 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 set(spark.serializer, classOf[KryoSerializer].getName)
 this
   }
+  /**
+   * Use Kryo serialization and register the given set of Avro schemas so 
that the generic
+   * record serializer can decrease network IO
+   */
+  def registerAvroSchema(schemas: Array[Schema]): SparkConf =
--- End diff --

how about using varargs here? `def registerAvroSchema(schemas: Schema*): 
SparkConf`

then you could call using any of:
```
registerAvroSchema(schema)
registerAvroSchema(schemaOne, schemaTwo, schemaThree)
registerAvroSchema(schemaSeq: _*) // works w/ an array too
```

and maybe rename the method to plural, `registerAvroSchemas`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34585666
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}
+import java.nio.ByteBuffer
+
+import com.esotericsoftware.kryo.io.{Output, Input}
+import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.{SchemaBuilder, Schema}
+import org.apache.spark.{SparkFunSuite, SharedSparkContext}
+
+class GenericAvroSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
+  conf.set(spark.serializer, 
org.apache.spark.serializer.KryoSerializer)
+
+  val schema : Schema = SchemaBuilder
+.record(testRecord).fields()
+.requiredString(data)
+.endRecord()
+  val record = new Record(schema)
+  record.put(data, test data)
+
+  test(schema compression and decompression) {
+val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+assert(schema === 
genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema
+  }
+
+  test(record serialization and deserialization) {
+val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+
+val outputStream = new ByteArrayOutputStream()
+val output = new Output(outputStream)
+genericSer.serializeDatum(record, output)
+output.flush()
+output.close()
+
+val input = new Input(new 
ByteArrayInputStream(outputStream.toByteArray))
+assert(genericSer.deserializeDatum(input) === record)
+  }
+
+  test(uses schema fingerprint to decrease message size) {
+val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+
+val output = new Output(new ByteArrayOutputStream())
+
+val beginningNormalPosition = output.total()
+genericSerFull.serializeDatum(record, output)
+output.flush()
+val normalLength = output.total - beginningNormalPosition
+
+conf.registerAvroSchema(Array(schema))
--- End diff --

if you switch to varargs this could just be 
`conf.registerAvroSchema(schema)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581426
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
--- End diff --

super nit: class imports go before more deeply nested packages (its not 
just alphabetic), so it should be:

```
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._
```

I recommend using the **plugin** (not IntelliJ's builtin ordering) as 
described on the wiki: 
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121294001
  
Is it important to use ZLIB as opposed to one of the other compression 
libraries we already have?  Is that how we get better compression?  Its just a 
bit of a burden for code maintainers to learn, if say LZ4 works just as well, 
when we've already got it wrapped up in one of spark's `CompressionCodec` so 
the usage is standard.  Eg., I'm not familiar w/ the `Deflater`, and your usage 
isn't exactly the same as the simple example in the javadoc: 
http://docs.oracle.com/javase/7/docs/api/java/util/zip/Deflater.html so its a 
bit more work to make sure the usage is correct, its getting cleaned up, etc.  
If it provides some advantage, that is fine, but just want to make sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581493
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = avro.schema.
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
--- End diff --

`private[serializer]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581825
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer}
+import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = avro.schema.
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
--- End diff --

this method seems kinda pointless, same thing to inline everywhere


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34584579
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -72,6 +74,9 @@ class KryoSerializer(conf: SparkConf)
   private val classesToRegister = conf.get(spark.kryo.classesToRegister, 
)
 .split(',')
 .filter(!_.isEmpty)
+  conf.getExecutorEnv
--- End diff --

looks like a stray line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121311564
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121312894
  
  [Test build #37238 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/consoleFull)
 for   PR 7004 at commit 
[`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121311604
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121363467
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121363416
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121349416
  
  [Test build #37238 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/console)
 for   PR 7004 at commit 
[`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121349556
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121367942
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >