[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35784695 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -23,6 +23,8 @@ import javax.annotation.Nullable import scala.reflect.ClassTag +import org.apache.avro.generic.{GenericData, GenericRecord} --- End diff -- nit: order --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126021604 Thanks @JDrit ! I will fix those final details as I merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35784052 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) --- End diff -- Joe and I talked about this a bit offline -- the reason for this is that `ShuffleRDD` lets you set a Serializer directly, which is used in some tests, and that is why the serializer itself needs to be serializable. I'll add a comment here explaining why its necessary when I merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35784682 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.{SparkException, SparkEnv} +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + * @param schemas a map where the keys are unique IDs for Avro schemas and the values are the + *string representation of the Avro schema, used to decrease the amount of data + *that needs to be serialized. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val bos = new ByteArrayOutputStream() +val out = codec.compressedOutputStream(bos) +out.write(schema.toString.getBytes(UTF-8)) +out.close() +bos.toByteArray + }) + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val bis = new ByteArrayInputStream(schemaBytes.array()) +val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) +new Schema.Parser().parse(new String(bytes, UTF-8)) + }) + + /** + * Serializes a record to the given output stream. It caches a lot of the internal data as + * to not redo work + */ + def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): Unit = { +val encoder = EncoderFactory.get.binaryEncoder(output, null) +val schema = datum.getSchema +val fingerprint =
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35796975 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) --- End diff -- for anybody that is curious -- I had gotten myself pretty confused about why this change would make the `SparkConf` serialized. `KryoSerializer` already had a `conf` argument to the constructor, that wasn't changed. But the `conf` there is only accessed in field initialization, never in methods, so it wasn't stored. But through the wonders of scala, when you access that `conf` in a method, suddenly `conf` also becomes a member variable, and now you can no longer serialize the `KryoSerializer`. In practice this means the `lazy val codec` here is fine in actual use, but it could be very confusing in a unit test where the `SparkEnv` hasn't been set. So I'll add comment explaining this a bit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126088128 Re: the avro dependency, this is a net new dependency for core. Previously this came in via the hive module (the metastore dependency to be specific). I suppose it relies on the Hive profile therefore, but not the YARN profile. In any event the right thing to do is include the dependency if it's being used, of course. I suppose this is evidence that the Spark assembly -- the Hive flavors -- have had this dep and have been fine. Avro doesn't bring anything in that we didn't already have, except Avro: ``` [INFO] +- org.apache.avro:avro-mapred:jar:hadoop2:1.7.7:compile [INFO] | +- org.apache.avro:avro-ipc:jar:1.7.7:compile [INFO] | | +- (org.apache.avro:avro:jar:1.7.7:compile - version managed from 1.7.5; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed from 1.6.4; scope managed from runtime; omitted for duplicate) [INFO] | +- org.apache.avro:avro-ipc:jar:tests:1.7.7:compile [INFO] | | +- (org.apache.avro:avro:jar:1.7.7:compile - version managed from 1.7.5; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed from 1.6.4; scope managed from runtime; omitted for duplicate) [INFO] | +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed from 1.6.4; scope managed from runtime; omitted for duplicate) ``` So, I think the net change here is only that Avro has been added to core. Unless there's an objection to adding Avro at all, I think this is OK from a build standpoint. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126112201 Okay sounds good. Thanks for looking at it Sean. - Patrick On Wed, Jul 29, 2015 at 1:37 PM, Sean Owen notificati...@github.com wrote: Re: the avro dependency, this is a net new dependency for core. Previously this came in via the hive module (the metastore dependency to be specific). I suppose it relies on the Hive profile therefore, but not the YARN profile. In any event the right thing to do is include the dependency if it's being used, of course. I suppose this is evidence that the Spark assembly -- the Hive flavors -- have had this dep and have been fine. Avro doesn't bring anything in that we didn't already have, except Avro: [INFO] +- org.apache.avro:avro-mapred:jar:hadoop2:1.7.7:compile [INFO] | +- org.apache.avro:avro-ipc:jar:1.7.7:compile [INFO] | | +- (org.apache.avro:avro:jar:1.7.7:compile - version managed from 1.7.5; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed from 1.6.4; scope managed from runtime; omitted for duplicate) [INFO] | +- org.apache.avro:avro-ipc:jar:tests:1.7.7:compile [INFO] | | +- (org.apache.avro:avro:jar:1.7.7:compile - version managed from 1.7.5; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | | \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed from 1.6.4; scope managed from runtime; omitted for duplicate) [INFO] | +- (org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | +- (org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile - version managed from 1.9.2; omitted for duplicate) [INFO] | \- (org.slf4j:slf4j-api:jar:1.7.10:compile - version managed from 1.6.4; scope managed from runtime; omitted for duplicate) So, I think the net change here is only that Avro has been added to core. Unless there's an objection to adding Avro at all, I think this is OK from a build standpoint. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/7004#issuecomment-126088128. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126040715 sorry I am trying to understand this serialization thing a bit better ... something doesn't make sense to me, but mostly outside of these changes. Once I get a handle on that, I will merge this, just want to make sure I'm not missing something ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126063730 Did @srowen look at the build change? Sean or I should be signing off on any dependency changes in the build. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126084533 @pwendell @JoshRosen ah sorry, I thought @vanzin @zsxwing gave it the thumbs up earlier -- just following the comments I forgot to get the approval of sean or you as well. Lemme know if you think this needs an immediate revert. I will also check w/ sean. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126065124 There was a lot of up-thread discussion regarding the dependency change; my first comment on this PR was asking whether this introduced a new dependency, etc. One concern which might have been overlooked is whether the required dependencies will be packaged if Spark is build with the YARN profile disabled. It looks like the rationale upthread is that this dependency is already transitively included by our other dependencies, but I'm not sure if that will always be the case if we're relying on the YARN profile to include it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-126060583 merged to master, thanks @JDrit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7004 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-125257800 @squito @JoshRosen Have I addressed all your issues, or is there anything else you would like me to do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-125279434 I'm going to defer to other reviewers who have been following this patch more closely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-124537897 [Test build #1196 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1196/consoleFull) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-124573048 [Test build #1196 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1196/console) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35261143 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) --- End diff -- just curious, where does this constructed when there isn't a SparkEnv? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35266023 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) --- End diff -- See above, but I have also updated the comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123875296 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123875261 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123940828 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123940810 [Test build #38137 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38137/console) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123956225 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123956618 [Test build #38157 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38157/consoleFull) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123956172 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123955693 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123958303 [Test build #38157 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38157/console) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123958311 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35262959 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + private final val avroNamespace = avro.schema. + + /** + * Use Kryo serialization and register the given set of Avro schemas so that the generic + * record serializer can decrease network IO + */ + def registerAvroSchemas(schemas: Schema*): SparkConf = { +schemas.foldLeft(this) { (conf, schema) = + conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) +} + } + + /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ + def getAvroSchema: Map[Long, String] = { --- End diff -- What do the keys and values of this map denote? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35261569 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val bos = new ByteArrayOutputStream() +val out = codec.compressedOutputStream(bos) +out.write(schema.toString.getBytes(UTF-8)) +out.close() +bos.toByteArray + }) + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val bis = new ByteArrayInputStream(schemaBytes.array()) +val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) +new Schema.Parser().parse(new String(bytes, UTF-8)) + }) + + /** + * Serializes a record to the given output stream. It caches a lot of the internal data as + * to not redo work + */ + def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): Unit = { +val encoder = EncoderFactory.get.binaryEncoder(output, null) +val schema = datum.getSchema +val fingerprint = fingerprintCache.getOrElseUpdate(schema, { + SchemaNormalization.parsingFingerprint64(schema) +}) +schemas.get(fingerprint) match { + case Some(_) = +output.writeBoolean(true) +output.writeLong(fingerprint) +
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123857175 2 super minor comments, but otherwise lgtm! Also as a bit general house-keeping, can you put some of the benchmarks you have on the jira (since that serves as a better archive than github). @JoshRosen want to take another look at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35262721 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) --- End diff -- Why not just accept a `SparkConf` in the `GenericAvroSerializer` constructor instead of getting it from `SparkEnv`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35263041 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + private final val avroNamespace = avro.schema. + + /** + * Use Kryo serialization and register the given set of Avro schemas so that the generic + * record serializer can decrease network IO + */ + def registerAvroSchemas(schemas: Schema*): SparkConf = { +schemas.foldLeft(this) { (conf, schema) = --- End diff -- Why do you need a fold here? This seems confusing. Why not just loop over the schemas and call `conf.set()` for each? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35263138 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) --- End diff -- Similar comment here: what are the types in `schemas`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35263784 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val bos = new ByteArrayOutputStream() +val out = codec.compressedOutputStream(bos) +out.write(schema.toString.getBytes(UTF-8)) +out.close() +bos.toByteArray + }) + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val bis = new ByteArrayInputStream(schemaBytes.array()) +val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) +new Schema.Parser().parse(new String(bytes, UTF-8)) + }) + + /** + * Serializes a record to the given output stream. It caches a lot of the internal data as + * to not redo work + */ + def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): Unit = { +val encoder = EncoderFactory.get.binaryEncoder(output, null) +val schema = datum.getSchema +val fingerprint = fingerprintCache.getOrElseUpdate(schema, { + SchemaNormalization.parsingFingerprint64(schema) +}) +schemas.get(fingerprint) match { + case Some(_) = +output.writeBoolean(true) +output.writeLong(fingerprint) +
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35265166 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) --- End diff -- I originally had it accept SparkConf but I was getting serialization errors since SparkConf is not serializable . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123876150 [Test build #38114 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38114/consoleFull) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35263272 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val bos = new ByteArrayOutputStream() +val out = codec.compressedOutputStream(bos) +out.write(schema.toString.getBytes(UTF-8)) +out.close() +bos.toByteArray + }) + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val bis = new ByteArrayInputStream(schemaBytes.array()) +val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) +new Schema.Parser().parse(new String(bytes, UTF-8)) + }) + + /** + * Serializes a record to the given output stream. It caches a lot of the internal data as + * to not redo work + */ + def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): Unit = { +val encoder = EncoderFactory.get.binaryEncoder(output, null) +val schema = datum.getSchema +val fingerprint = fingerprintCache.getOrElseUpdate(schema, { + SchemaNormalization.parsingFingerprint64(schema) +}) +schemas.get(fingerprint) match { + case Some(_) = +output.writeBoolean(true) +output.writeLong(fingerprint)
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35263954 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) --- End diff -- Several of the tests involving block replication fail when this value is not lazily defined. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35265438 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.commons.io.IOUtils + +import org.apache.spark.SparkEnv +import org.apache.spark.io.CompressionCodec + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive so this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + /** This needs to be lazy since SparkEnv is not initialized yet sometimes when this is called */ + private lazy val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val bos = new ByteArrayOutputStream() +val out = codec.compressedOutputStream(bos) +out.write(schema.toString.getBytes(UTF-8)) +out.close() +bos.toByteArray + }) + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val bis = new ByteArrayInputStream(schemaBytes.array()) +val bytes = IOUtils.toByteArray(codec.compressedInputStream(bis)) +new Schema.Parser().parse(new String(bytes, UTF-8)) + }) + + /** + * Serializes a record to the given output stream. It caches a lot of the internal data as + * to not redo work + */ + def serializeDatum[R : GenericRecord](datum: R, output: KryoOutput): Unit = { +val encoder = EncoderFactory.get.binaryEncoder(output, null) +val schema = datum.getSchema +val fingerprint = fingerprintCache.getOrElseUpdate(schema, { + SchemaNormalization.parsingFingerprint64(schema) +}) +schemas.get(fingerprint) match { + case Some(_) = +output.writeBoolean(true) +output.writeLong(fingerprint) +
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35265806 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + private final val avroNamespace = avro.schema. + + /** + * Use Kryo serialization and register the given set of Avro schemas so that the generic + * record serializer can decrease network IO + */ + def registerAvroSchemas(schemas: Schema*): SparkConf = { +schemas.foldLeft(this) { (conf, schema) = --- End diff -- Got it, changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r35265695 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -161,6 +162,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + private final val avroNamespace = avro.schema. + + /** + * Use Kryo serialization and register the given set of Avro schemas so that the generic + * record serializer can decrease network IO + */ + def registerAvroSchemas(schemas: Schema*): SparkConf = { +schemas.foldLeft(this) { (conf, schema) = + conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) +} + } + + /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ + def getAvroSchema: Map[Long, String] = { --- End diff -- The keys are longs, which represent the a unique ID of the schema and the values are the string representation of the schema. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123919144 [Test build #38137 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38137/consoleFull) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123909575 [Test build #38114 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/38114/console) for PR 7004 at commit [`8158d51`](https://github.com/apache/spark/commit/8158d5113a9084e44482ebfe0fbb73fe3d7bddd8). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123909597 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123919007 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123918991 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-123918916 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-122025940 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-122025951 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-122026253 [Test build #37523 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37523/consoleFull) for PR 7004 at commit [`c0cf329`](https://github.com/apache/spark/commit/c0cf32988d5c77655c09e9c798bbb49cb8b68250). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-122059098 [Test build #37523 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37523/console) for PR 7004 at commit [`c0cf329`](https://github.com/apache/spark/commit/c0cf32988d5c77655c09e9c798bbb49cb8b68250). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-122059291 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user JDrit commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34723675 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.nio.ByteBuffer + +import org.apache.commons.io.IOUtils +import org.apache.spark.io.CompressionCodec +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ + +/** + * + */ +private[spark] object GenericAvroSerializer { --- End diff -- Since those constants were only be used in SparkConf, I just moved the entire object there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121745318 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121745295 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121743717 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121743738 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121745543 [Test build #37405 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37405/consoleFull) for PR 7004 at commit [`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121751252 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121751318 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121750826 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121751885 [Test build #37408 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37408/consoleFull) for PR 7004 at commit [`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121743981 [Test build #37404 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37404/consoleFull) for PR 7004 at commit [`c5fe794`](https://github.com/apache/spark/commit/c5fe79416f2dfd040514f6ef7f0852cd174db466). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121749529 [Test build #37405 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37405/console) for PR 7004 at commit [`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121749603 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121774720 [Test build #37408 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37408/console) for PR 7004 at commit [`fa9298b`](https://github.com/apache/spark/commit/fa9298b70d2e07b6a0f3df51e758c0fe7e77ba3e). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121774824 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121777631 **[Test build #37404 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37404/console)** for PR 7004 at commit [`c5fe794`](https://github.com/apache/spark/commit/c5fe79416f2dfd040514f6ef7f0852cd174db466) after a configured wait of `175m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-12135 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121789774 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121789744 [Test build #37417 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37417/console) for PR 7004 at commit [`dd71efe`](https://github.com/apache/spark/commit/dd71efeb302a2009c448e1c5a030daf38f08b322). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121764074 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121767183 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121767164 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121767711 [Test build #37417 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37417/consoleFull) for PR 7004 at commit [`dd71efe`](https://github.com/apache/spark/commit/dd71efeb302a2009c448e1c5a030daf38f08b322). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121760803 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121760780 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581021 --- Diff: core/pom.xml --- @@ -35,6 +35,16 @@ urlhttp://spark.apache.org//url dependencies dependency + groupIdnet.sf.py4j/groupId + artifactIdpy4j/artifactId + version0.8.2.1/version +/dependency --- End diff -- is there any reason to move this? if not, leave it in the old location, so its easier to understand where it got added w/ git blame etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581606 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = avro.schema. + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ --- End diff -- typo: so --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581486 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { --- End diff -- `private[serializer]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34583682 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = avro.schema. + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val deflater = new Deflater(Deflater.BEST_COMPRESSION) +val schemaBytes = schema.toString.getBytes(UTF-8) +deflater.setInput(schemaBytes) +deflater.finish() +val buffer = Array.ofDim[Byte](schemaBytes.length) +val outputStream = new ByteArrayOutputStream(schemaBytes.length) +while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) +} +outputStream.close() +outputStream.toByteArray + }) + + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val inflater = new Inflater() +val bytes = schemaBytes.array() +inflater.setInput(bytes) +val outputStream = new ByteArrayOutputStream(bytes.length) +val tmpBuffer = Array.ofDim[Byte](1024) +while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) +} +inflater.end() +outputStream.close() +new Schema.Parser().parse(new String(outputStream.toByteArray, UTF-8)) + }) + + /** + *
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34584076 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = avro.schema. + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val deflater = new Deflater(Deflater.BEST_COMPRESSION) +val schemaBytes = schema.toString.getBytes(UTF-8) +deflater.setInput(schemaBytes) +deflater.finish() +val buffer = Array.ofDim[Byte](schemaBytes.length) +val outputStream = new ByteArrayOutputStream(schemaBytes.length) +while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) +} +outputStream.close() +outputStream.toByteArray + }) + + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val inflater = new Inflater() +val bytes = schemaBytes.array() +inflater.setInput(bytes) +val outputStream = new ByteArrayOutputStream(bytes.length) +val tmpBuffer = Array.ofDim[Byte](1024) +while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) +} +inflater.end() +outputStream.close() +new Schema.Parser().parse(new String(outputStream.toByteArray, UTF-8)) + }) + + /** + *
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34584417 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = avro.schema. + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) + + /** + * Used to compress Schemas when they are being sent over the wire. + * The compression results are memoized to reduce the compression time since the + * same schema is compressed many times over + */ + def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { +val deflater = new Deflater(Deflater.BEST_COMPRESSION) +val schemaBytes = schema.toString.getBytes(UTF-8) +deflater.setInput(schemaBytes) +deflater.finish() +val buffer = Array.ofDim[Byte](schemaBytes.length) +val outputStream = new ByteArrayOutputStream(schemaBytes.length) +while(!deflater.finished()) { + val count = deflater.deflate(buffer) + outputStream.write(buffer, 0, count) +} +outputStream.close() +outputStream.toByteArray + }) + + + /** + * Decompresses the schema into the actual in-memory object. Keeps an internal cache of already + * seen values so to limit the number of times that decompression has to be done. + */ + def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { +val inflater = new Inflater() +val bytes = schemaBytes.array() +inflater.setInput(bytes) +val outputStream = new ByteArrayOutputStream(bytes.length) +val tmpBuffer = Array.ofDim[Byte](1024) +while (!inflater.finished()) { + val count = inflater.inflate(tmpBuffer) + outputStream.write(tmpBuffer, 0, count) +} +inflater.end() +outputStream.close() +new Schema.Parser().parse(new String(outputStream.toByteArray, UTF-8)) + }) + + /** + *
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34585605 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -160,6 +162,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set(spark.serializer, classOf[KryoSerializer].getName) this } + /** + * Use Kryo serialization and register the given set of Avro schemas so that the generic + * record serializer can decrease network IO + */ + def registerAvroSchema(schemas: Array[Schema]): SparkConf = --- End diff -- how about using varargs here? `def registerAvroSchema(schemas: Schema*): SparkConf` then you could call using any of: ``` registerAvroSchema(schema) registerAvroSchema(schemaOne, schemaTwo, schemaThree) registerAvroSchema(schemaSeq: _*) // works w/ an array too ``` and maybe rename the method to plural, `registerAvroSchemas` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34585666 --- Diff: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} +import java.nio.ByteBuffer + +import com.esotericsoftware.kryo.io.{Output, Input} +import org.apache.avro.generic.GenericData.Record +import org.apache.avro.{SchemaBuilder, Schema} +import org.apache.spark.{SparkFunSuite, SharedSparkContext} + +class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { + conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) + + val schema : Schema = SchemaBuilder +.record(testRecord).fields() +.requiredString(data) +.endRecord() + val record = new Record(schema) + record.put(data, test data) + + test(schema compression and decompression) { +val genericSer = new GenericAvroSerializer(conf.getAvroSchema) +assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema + } + + test(record serialization and deserialization) { +val genericSer = new GenericAvroSerializer(conf.getAvroSchema) + +val outputStream = new ByteArrayOutputStream() +val output = new Output(outputStream) +genericSer.serializeDatum(record, output) +output.flush() +output.close() + +val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) +assert(genericSer.deserializeDatum(input) === record) + } + + test(uses schema fingerprint to decrease message size) { +val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema) + +val output = new Output(new ByteArrayOutputStream()) + +val beginningNormalPosition = output.total() +genericSerFull.serializeDatum(record, output) +output.flush() +val normalLength = output.total - beginningNormalPosition + +conf.registerAvroSchema(Array(schema)) --- End diff -- if you switch to varargs this could just be `conf.registerAvroSchema(schema)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581426 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} --- End diff -- super nit: class imports go before more deeply nested packages (its not just alphabetic), so it should be: ``` import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ ``` I recommend using the **plugin** (not IntelliJ's builtin ordering) as described on the wiki: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121294001 Is it important to use ZLIB as opposed to one of the other compression libraries we already have? Is that how we get better compression? Its just a bit of a burden for code maintainers to learn, if say LZ4 works just as well, when we've already got it wrapped up in one of spark's `CompressionCodec` so the usage is standard. Eg., I'm not familiar w/ the `Deflater`, and your usage isn't exactly the same as the simple example in the javadoc: http://docs.oracle.com/javase/7/docs/api/java/util/zip/Deflater.html so its a bit more work to make sure the usage is correct, its getting cleaned up, etc. If it provides some advantage, that is fine, but just want to make sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581493 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = avro.schema. + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { --- End diff -- `private[serializer]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34581825 --- Diff: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.serializer + +import java.io.ByteArrayOutputStream +import java.nio.ByteBuffer +import java.util.zip.{Inflater, Deflater} + +import scala.collection.mutable + +import com.esotericsoftware.kryo.{Kryo, Serializer = KSerializer} +import com.esotericsoftware.kryo.io.{Input = KryoInput, Output = KryoOutput} +import org.apache.avro.generic.{GenericData, GenericRecord} +import org.apache.avro.io._ +import org.apache.avro.{Schema, SchemaNormalization} + +object GenericAvroSerializer { + val avroSchemaNamespace = avro.schema. + def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +} + +/** + * Custom serializer used for generic Avro records. If the user registers the schemas + * ahead of time, then the schema's fingerprint will be sent with each message instead of the actual + * schema, as to reduce network IO. + * Actions like parsing or compressing schemas are computationally expensive so the serializer + * caches all previously seen values as to reduce the amount of work needed to do. + */ +class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { + + /** Used to reduce the amount of effort to compress the schema */ + private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() + private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() + + /** Reuses the same datum reader/writer since the same schema will be used many times */ + private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() + private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() + + /** Fingerprinting is very expensive to this alleviates most of the work */ + private val fingerprintCache = new mutable.HashMap[Schema, Long]() + private val schemaCache = new mutable.HashMap[Long, Schema]() + + private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) --- End diff -- this method seems kinda pointless, same thing to inline everywhere --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/7004#discussion_r34584579 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -72,6 +74,9 @@ class KryoSerializer(conf: SparkConf) private val classesToRegister = conf.get(spark.kryo.classesToRegister, ) .split(',') .filter(!_.isEmpty) + conf.getExecutorEnv --- End diff -- looks like a stray line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121311564 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121312894 [Test build #37238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/consoleFull) for PR 7004 at commit [`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121311604 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121363467 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121363416 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121349416 [Test build #37238 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/37238/console) for PR 7004 at commit [`6d1925c`](https://github.com/apache/spark/commit/6d1925c5ea899e61103d7f3fa332db771b9616b2). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121349556 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-746][CORE] Added Avro Serialization to ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7004#issuecomment-121367942 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org