[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121349416
  
  [Test build #37238 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/console)
 for   PR 7004 at commit 
[`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121312894
  
  [Test build #37238 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/consoleFull)
 for   PR 7004 at commit 
[`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121311564
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121311604
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-121294001
  
Is it important to use ZLIB as opposed to one of the other compression 
libraries we already have?  Is that how we get better compression?  Its just a 
bit of a burden for code maintainers to learn, if say LZ4 works just as well, 
when we've already got it wrapped up in one of spark's `CompressionCodec` so 
the usage is standard.  Eg., I'm not familiar w/ the `Deflater`, and your usage 
isn't exactly the same as the simple example in the javadoc: 
http://docs.oracle.com/javase/7/docs/api/java/util/zip/Deflater.html so its a 
bit more work to make sure the usage is correct, its getting cleaned up, etc.  
If it provides some advantage, that is fine, but just want to make sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34585666
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}
+import java.nio.ByteBuffer
+
+import com.esotericsoftware.kryo.io.{Output, Input}
+import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.{SchemaBuilder, Schema}
+import org.apache.spark.{SparkFunSuite, SharedSparkContext}
+
+class GenericAvroSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
+  conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+
+  val schema : Schema = SchemaBuilder
+.record("testRecord").fields()
+.requiredString("data")
+.endRecord()
+  val record = new Record(schema)
+  record.put("data", "test data")
+
+  test("schema compression and decompression") {
+val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+assert(schema === 
genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema
+  }
+
+  test("record serialization and deserialization") {
+val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+
+val outputStream = new ByteArrayOutputStream()
+val output = new Output(outputStream)
+genericSer.serializeDatum(record, output)
+output.flush()
+output.close()
+
+val input = new Input(new 
ByteArrayInputStream(outputStream.toByteArray))
+assert(genericSer.deserializeDatum(input) === record)
+  }
+
+  test("uses schema fingerprint to decrease message size") {
+val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+
+val output = new Output(new ByteArrayOutputStream())
+
+val beginningNormalPosition = output.total()
+genericSerFull.serializeDatum(record, output)
+output.flush()
+val normalLength = output.total - beginningNormalPosition
+
+conf.registerAvroSchema(Array(schema))
--- End diff --

if you switch to varargs this could just be 
`conf.registerAvroSchema(schema)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34585605
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -160,6 +162,21 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 set("spark.serializer", classOf[KryoSerializer].getName)
 this
   }
+  /**
+   * Use Kryo serialization and register the given set of Avro schemas so 
that the generic
+   * record serializer can decrease network IO
+   */
+  def registerAvroSchema(schemas: Array[Schema]): SparkConf =
--- End diff --

how about using varargs here? `def registerAvroSchema(schemas: Schema*): 
SparkConf`

then you could call using any of:
```
registerAvroSchema(schema)
registerAvroSchema(schemaOne, schemaTwo, schemaThree)
registerAvroSchema(schemaSeq: _*) // works w/ an array too
```

and maybe rename the method to plural, `registerAvroSchemas`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34584579
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -72,6 +74,9 @@ class KryoSerializer(conf: SparkConf)
   private val classesToRegister = conf.get("spark.kryo.classesToRegister", 
"")
 .split(',')
 .filter(!_.isEmpty)
+  conf.getExecutorEnv
--- End diff --

looks like a stray line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34584417
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = "avro.schema."
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val deflater = new Deflater(Deflater.BEST_COMPRESSION)
+val schemaBytes = schema.toString.getBytes("UTF-8")
+deflater.setInput(schemaBytes)
+deflater.finish()
+val buffer = Array.ofDim[Byte](schemaBytes.length)
+val outputStream = new ByteArrayOutputStream(schemaBytes.length)
+while(!deflater.finished()) {
+  val count = deflater.deflate(buffer)
+  outputStream.write(buffer, 0, count)
+}
+outputStream.close()
+outputStream.toByteArray
+  })
+
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val inflater = new Inflater()
+val bytes = schemaBytes.array()
+inflater.setInput(bytes)
+val outputStream = new ByteArrayOutputStream(bytes.length)
+val tmpBuffer = Array.ofDim[Byte](1024)
+while (!inflater.finished()) {
+  val count = inflater.inflate(tmpBuffer)
+  outputStream.write(tmpBuffer, 0, count)
+}
+inflater.end()
+outputStream.close()
+new Schema.Parser().parse(new String(outputStream.toByteArray, 
"UTF-8"))
+  })
+
+  /**

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34584076
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = "avro.schema."
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val deflater = new Deflater(Deflater.BEST_COMPRESSION)
+val schemaBytes = schema.toString.getBytes("UTF-8")
+deflater.setInput(schemaBytes)
+deflater.finish()
+val buffer = Array.ofDim[Byte](schemaBytes.length)
+val outputStream = new ByteArrayOutputStream(schemaBytes.length)
+while(!deflater.finished()) {
+  val count = deflater.deflate(buffer)
+  outputStream.write(buffer, 0, count)
+}
+outputStream.close()
+outputStream.toByteArray
+  })
+
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val inflater = new Inflater()
+val bytes = schemaBytes.array()
+inflater.setInput(bytes)
+val outputStream = new ByteArrayOutputStream(bytes.length)
+val tmpBuffer = Array.ofDim[Byte](1024)
+while (!inflater.finished()) {
+  val count = inflater.inflate(tmpBuffer)
+  outputStream.write(tmpBuffer, 0, count)
+}
+inflater.end()
+outputStream.close()
+new Schema.Parser().parse(new String(outputStream.toByteArray, 
"UTF-8"))
+  })
+
+  /**

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34583682
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = "avro.schema."
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
+
+  /**
+   * Used to compress Schemas when they are being sent over the wire.
+   * The compression results are memoized to reduce the compression time 
since the
+   * same schema is compressed many times over
+   */
+  def compress(schema: Schema): Array[Byte] = 
compressCache.getOrElseUpdate(schema, {
+val deflater = new Deflater(Deflater.BEST_COMPRESSION)
+val schemaBytes = schema.toString.getBytes("UTF-8")
+deflater.setInput(schemaBytes)
+deflater.finish()
+val buffer = Array.ofDim[Byte](schemaBytes.length)
+val outputStream = new ByteArrayOutputStream(schemaBytes.length)
+while(!deflater.finished()) {
+  val count = deflater.deflate(buffer)
+  outputStream.write(buffer, 0, count)
+}
+outputStream.close()
+outputStream.toByteArray
+  })
+
+
+  /**
+   * Decompresses the schema into the actual in-memory object. Keeps an 
internal cache of already
+   * seen values so to limit the number of times that decompression has to 
be done.
+   */
+  def decompress(schemaBytes: ByteBuffer): Schema = 
decompressCache.getOrElseUpdate(schemaBytes, {
+val inflater = new Inflater()
+val bytes = schemaBytes.array()
+inflater.setInput(bytes)
+val outputStream = new ByteArrayOutputStream(bytes.length)
+val tmpBuffer = Array.ofDim[Byte](1024)
+while (!inflater.finished()) {
+  val count = inflater.inflate(tmpBuffer)
+  outputStream.write(tmpBuffer, 0, count)
+}
+inflater.end()
+outputStream.close()
+new Schema.Parser().parse(new String(outputStream.toByteArray, 
"UTF-8"))
+  })
+
+  /**

[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581825
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = "avro.schema."
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
+  private val fingerprintCache = new mutable.HashMap[Schema, Long]()
+  private val schemaCache = new mutable.HashMap[Long, Schema]()
+
+  private def getSchema(fingerprint: Long): Option[String] = 
schemas.get(fingerprint)
--- End diff --

this method seems kinda pointless, same thing to inline everywhere


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581606
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = "avro.schema."
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
+
+  /** Used to reduce the amount of effort to compress the schema */
+  private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
+  private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]()
+
+  /** Reuses the same datum reader/writer since the same schema will be 
used many times */
+  private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
+  private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()
+
+  /** Fingerprinting is very expensive to this alleviates most of the work 
*/
--- End diff --

typo: so


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581486
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
--- End diff --

`private[serializer]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581493
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
+
+object GenericAvroSerializer {
+  val avroSchemaNamespace = "avro.schema."
+  def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + 
fingerprint
+}
+
+/**
+ * Custom serializer used for generic Avro records. If the user registers 
the schemas
+ * ahead of time, then the schema's fingerprint will be sent with each 
message instead of the actual
+ * schema, as to reduce network IO.
+ * Actions like parsing or compressing schemas are computationally 
expensive so the serializer
+ * caches all previously seen values as to reduce the amount of work 
needed to do.
+ */
+class GenericAvroSerializer(schemas: Map[Long, String]) extends 
KSerializer[GenericRecord] {
--- End diff --

`private[serializer]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581426
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+import java.util.zip.{Inflater, Deflater}
+
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
+import org.apache.avro.generic.{GenericData, GenericRecord}
+import org.apache.avro.io._
+import org.apache.avro.{Schema, SchemaNormalization}
--- End diff --

super nit: class imports go before more deeply nested packages (its not 
just alphabetic), so it should be:

```
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._
```

I recommend using the **plugin** (not IntelliJ's builtin ordering) as 
described on the wiki: 
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/7004#discussion_r34581021
  
--- Diff: core/pom.xml ---
@@ -35,6 +35,16 @@
   http://spark.apache.org/
   
 
+  net.sf.py4j
+  py4j
+  0.8.2.1
+
--- End diff --

is there any reason to move this?  if not, leave it in the old location, so 
its easier to understand where it got added w/ git blame etc. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-07-07 Thread JDrit
Github user JDrit commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-119288485
  
@JoshRosen does that satisfy your concerns and are there other changes you 
would want me to make? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-06-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7004#issuecomment-115048440
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...

2015-06-24 Thread JDrit
GitHub user JDrit opened a pull request:

https://github.com/apache/spark/pull/7004

[SPARK-746][CORE] Added Avro Serialization to Kryo

Added a custom Kryo serializer for generic Avro records to reduce the 
network IO
involved during a shuffle. This compresses the schema and allows for users 
to
register their schemas ahead of time to further reduce traffic.

Currently Kryo tries to use its default serializer for generic Records, 
which will include
a lot of unneeded data in each record.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JDrit/spark Avro_serialization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7004.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7004


commit 97fba623260f34fe7b8e928aa3a9c5c7ed4c
Author: Joseph Batchik 
Date:   2015-06-23T16:58:22Z

Added a custom Kryo serializer for generic Avro records to reduce the 
network IO
involved during a shuffle. This compresses the schema and allows for users 
to
register their schemas ahead of time to further reduce traffic.

Changed how the records were serialized to reduce both the time and memory 
used

removed unused variable




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2