[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121349416 [Test build #37238 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/console) for PR 7004 at commit [`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121312894 [Test build #37238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/consoleFull) for PR 7004 at commit [`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121311564 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121311604 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121294001 Is it important to use ZLIB as opposed to one of the other compression libraries we already have? Is that how we get better compression? Its just a bit of a burden for code maintainers to learn, if say LZ4 works just as well, when we've already got it wrapped up in one of spark's `CompressionCodec` so the usage is standard. Eg., I'm not familiar w/ the `Deflater`, and your usage isn't exactly the same as the simple example in the javadoc: http://docs.oracle.com/javase/7/docs/api/java/util/zip/Deflater.html so its a bit more work to make sure the usage is correct, its getting cleaned up, etc. If it provides some advantage, that is fine, but just want to make sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34585666 --- Diff: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} +import java.nio.ByteBuffer + +import com.esotericsoftware.kryo.io.{Output, Input} +import org.apache.avro.generic.GenericData.Record +import org.apache.avro.{SchemaBuilder, Schema} +import org.apache.spark.{SparkFunSuite, SharedSparkContext} + +class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + + val schema : Schema = SchemaBuilder +.record("testRecord").fields() +.requiredString("data") +.endRecord() + val record = new Record(schema) + record.put("data", "test data") + + test("schema compression and decompression") { +val genericSer = new GenericAvroSerializer(conf.getAvroSchema) +assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema + } + + test("record serialization and deserialization") { +val genericSer = new GenericAvroSerializer(conf.getAvroSchema) + +val outputStream = new ByteArrayOutputStream() +val output = new Output(outputStream) +genericSer.serializeDatum(record, output) +output.flush() +output.close() + +val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) +assert(genericSer.deserializeDatum(input) === record) + } + + test("uses schema fingerprint to decrease message size") { +val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema) + +val output = new Output(new ByteArrayOutputStream()) + +val beginningNormalPosition = output.total() +genericSerFull.serializeDatum(record, output) +output.flush() +val normalLength = output.total - beginningNormalPosition + +conf.registerAvroSchema(Array(schema)) --- End diff -- if you switch to varargs this could just be `conf.registerAvroSchema(schema)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34585605 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -160,6 +162,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set("spark.serializer", classOf[KryoSerializer].getName) this } + /** + * Use Kryo serialization and register the given set of Avro schemas so that the generic + * record serializer can decrease network IO + */ + def registerAvroSchema(schemas: Array[Schema]): SparkConf = --- End diff -- how about using varargs here? `def registerAvroSchema(schemas: Schema*): SparkConf` then you could call using any of: ``` registerAvroSchema(schema) registerAvroSchema(schemaOne, schemaTwo, schemaThree) registerAvroSchema(schemaSeq: _*) // works w/ an array too ``` and maybe rename the method to plural, `registerAvroSchemas` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34584579 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -72,6 +74,9 @@ class KryoSerializer(conf: SparkConf) private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) + conf.getExecutorEnv --- End diff -- looks like a stray line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34584417 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val deflater = new Deflater(Deflater.BEST_COMPRESSION) +val schemaBytes = schema.toString.getBytes("UTF-8") +deflater.setInput(schemaBytes) +deflater.finish() +val buffer = Array.ofDim[Byte](schemaBytes.length) +val outputStream = new ByteArrayOutputStream(schemaBytes.length) +while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) +} +outputStream.close() +outputStream.toByteArray + }) + + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val inflater = new Inflater() +val bytes = schemaBytes.array() +inflater.setInput(bytes) +val outputStream = new ByteArrayOutputStream(bytes.length) +val tmpBuffer = Array.ofDim[Byte](1024) +while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) +} +inflater.end() +outputStream.close() +new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) + }) + + /**
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34584076 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val deflater = new Deflater(Deflater.BEST_COMPRESSION) +val schemaBytes = schema.toString.getBytes("UTF-8") +deflater.setInput(schemaBytes) +deflater.finish() +val buffer = Array.ofDim[Byte](schemaBytes.length) +val outputStream = new ByteArrayOutputStream(schemaBytes.length) +while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) +} +outputStream.close() +outputStream.toByteArray + }) + + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val inflater = new Inflater() +val bytes = schemaBytes.array() +inflater.setInput(bytes) +val outputStream = new ByteArrayOutputStream(bytes.length) +val tmpBuffer = Array.ofDim[Byte](1024) +while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) +} +inflater.end() +outputStream.close() +new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) + }) + + /**
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34583682 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val deflater = new Deflater(Deflater.BEST_COMPRESSION) +val schemaBytes = schema.toString.getBytes("UTF-8") +deflater.setInput(schemaBytes) +deflater.finish() +val buffer = Array.ofDim[Byte](schemaBytes.length) +val outputStream = new ByteArrayOutputStream(schemaBytes.length) +while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) +} +outputStream.close() +outputStream.toByteArray + }) + + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val inflater = new Inflater() +val bytes = schemaBytes.array() +inflater.setInput(bytes) +val outputStream = new ByteArrayOutputStream(bytes.length) +val tmpBuffer = Array.ofDim[Byte](1024) +while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) +} +inflater.end() +outputStream.close() +new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) + }) + + /**
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581825 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) --- End diff -- this method seems kinda pointless, same thing to inline everywhere --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581606 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ --- End diff -- typo: so --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581486 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { --- End diff -- `private[serializer]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581493 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { --- End diff -- `private[serializer]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581426 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} --- End diff -- super nit: class imports go before more deeply nested packages (its not just alphabetic), so it should be: ``` import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ ``` I recommend using the **plugin** (not IntelliJ's builtin ordering) as described on the wiki: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581021 --- Diff: core/pom.xml --- @@ -35,6 +35,16 @@ http://spark.apache.org/ + net.sf.py4j + py4j + 0.8.2.1 + --- End diff -- is there any reason to move this? if not, leave it in the old location, so its easier to understand where it got added w/ git blame etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-119288485 @JoshRosen does that satisfy your concerns and are there other changes you would want me to make? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-115048440 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
GitHub user JDrit opened a pull request: https://github.com/apache/spark/pull/7004 [SPARK-746][CORE] Added Avro Serialization to Kryo Added a custom Kryo serializer for generic Avro records to reduce the network IO involved during a shuffle. This compresses the schema and allows for users to register their schemas ahead of time to further reduce traffic. Currently Kryo tries to use its default serializer for generic Records, which will include a lot of unneeded data in each record. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JDrit/spark Avro_serialization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7004.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7004 commit 97fba623260f34fe7b8e928aa3a9c5c7ed4c Author: Joseph Batchik Date: 2015-06-23T16:58:22Z Added a custom Kryo serializer for generic Avro records to reduce the network IO involved during a shuffle. This compresses the schema and allows for users to register their schemas ahead of time to further reduce traffic. Changed how the records were serialized to reduce both the time and memory used removed unused variable --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org