[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143873375 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143872950 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10608][CORE] disable reduce locality as...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8765#issuecomment-143872949 Build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10608][CORE] disable reduce locality as...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8765#issuecomment-143872913 Build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143872924 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Update dropDuplicates() documentation
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8930#issuecomment-143872898 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143872026 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43072/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143872024 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Update dropDuplicates() documentation
GitHub user asokadiggs opened a pull request: https://github.com/apache/spark/pull/8930 Update dropDuplicates() documentation Documentation for dropDuplicates() and drop_duplicates() is one and the same. Resolved the error in the example for drop_duplicates using the same approach used for groupby and groupBy, by indicating that dropDuplicates and drop_duplicates are aliases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/asokadiggs/spark jira-10782 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8930.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8930 commit 279d6207be25cead5ef5b1c04f1719bd15f2dc9a Author: asokadiggs Date: 2015-09-28T20:51:03Z Update dataframe.py Documentation for dropDuplicates() and drop_duplicates() is one and the same. Resolved the error in the example for drop_duplicates using the same approach used for groupby and groupBy, by indicating that dropDuplicates and drop_duplicates are aliases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10822] Move contents of spark-unsafe su...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/8912#issuecomment-143871541 For some context, the reason that I want to move these classes is because it would be nice to integrate the TaskMemoryManager and ExecutorMemoryManager with other parts of Spark Core (as part of the unified memory management story). If we were to simply move the TaskMemoryManager class to Spark Core, then we couldn't use it in the classes in the Unsafe package (which doesn't depend on core). Now that I think about this more, though, I think that there might be a different restructuring which would let us move those classes without having as broad an impact on the package. Therefore, let's hold off on merging this for now while I investigate to see whether there's an alternative that won't introduce this dependency problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143871973 [Test build #43072 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43072/console) for PR 8908 at commit [`a346cc6`](https://github.com/apache/spark/commit/a346cc62834ef28246505483cb76957e8b8cba0a). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10822] Move contents of spark-unsafe su...
Github user JoshRosen closed the pull request at: https://github.com/apache/spark/pull/8912 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10395] [SQL] Simplifies CatalystReadSup...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/8553 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10395] [SQL] Simplifies CatalystReadSup...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/8553#issuecomment-143870853 Merged into master, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10395] [SQL] Simplifies CatalystReadSup...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/8553#issuecomment-143870541 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143868409 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143868525 [Test build #43072 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43072/consoleFull) for PR 8908 at commit [`a346cc6`](https://github.com/apache/spark/commit/a346cc62834ef28246505483cb76957e8b8cba0a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143868403 [Test build #43071 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43071/console) for PR 8374 at commit [`44e1978`](https://github.com/apache/spark/commit/44e1978530a9703ecad9fae90a7ea9ebb0693fd3). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143868413 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43071/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143867712 [Test build #43071 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43071/consoleFull) for PR 8374 at commit [`44e1978`](https://github.com/apache/spark/commit/44e1978530a9703ecad9fae90a7ea9ebb0693fd3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143866834 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143866812 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user olarayej commented on a diff in the pull request: https://github.com/apache/spark/pull/8908#discussion_r40602070 --- Diff: R/pkg/R/DataFrame.R --- @@ -1848,3 +1848,28 @@ setMethod("crosstab", sct <- callJMethod(statFunctions, "crosstab", col1, col2) collect(dataFrame(sct)) }) + + +#' This function downloads the contents of a DataFrame into an R's data.frame. +#' Since data.frames are held in memory, ensure that you have enough memory +#' in your system to accommodate the contents. +#' +#' @title Download data from a DataFrame into a data.frame +#' @param x a DataFrame +#' @return a data.frame +#' @rdname as.data.frame +#' @examples \dontrun{ +#' +#' irisDF <- createDataFrame(sqlContext, iris) +#' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ]) +#' } +setMethod(f = "as.data.frame", --- End diff -- Sorry I missed that! I have fixed it now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601587 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143865802 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143865770 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601372 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601188 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** + * Start time of the application, as received in the start event. + */ + private var startTime: Long = _ + + /** + * Start time of the application, as received in the end event. + */ + private var endTime: Long = _ + + /** number of events to batch up before posting*/ + private var _batchSize = DEFAULT_BATCH_SIZE + + /** queue of entities to asynchronously post, plus the number of events in each entry */ + private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]() + + /** limit on the total number of events permitted */ + private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT + + /** + * List of events which will be pulled into a timeline + * entity when created + */ + private var pendingEvents = new
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40601106 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { +logDebug(s"Entering state $state from $serviceState") +_serviceState.getAndSet(state) + } + + /** + * Spark context; valid once started + */ + private var sparkContext: SparkContext = _ + + /** YARN configuration from the spark context */ + private var config: YarnConfiguration = _ + + /** application ID. */ + private var _applicationId: ApplicationId = _ + + /** attempt ID this will be null if the service is started in yarn-client mode */ + private var _attemptId: Option[ApplicationAttemptId] = None + + /** YARN timeline client */ + private var _timelineClient: Option[TimelineClient] = None + + /** registered event listener */ + private var listener: Option[YarnEventListener] = None + + /** Application name from the spark start event */ + private var applicationName: String = _ + + /** Application ID*/ + private var sparkApplicationId: Option[String] = None + + /** Optional Attempt ID from the spark start event */ + private var sparkApplicationAttemptId: Option[String] = None + + /** user name as derived from `SPARK_USER` env var or `UGI` */ + private var userName = Utils.getCurrentUserName() + + /** Clock for recording time */ + private val clock = new SystemClock() + + /** --- End diff -- nit: please be consistent re: single line vs. multi-line docs, and starting with capital or lower case letters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40600962 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} +import java.util.concurrent.{TimeUnit, LinkedBlockingDeque} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, TimelineEntity, TimelineEvent, TimelinePutResponse} +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId} +import org.apache.hadoop.yarn.client.api.TimelineClient +import org.apache.hadoop.yarn.conf.YarnConfiguration + +import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{YarnExtensionService, YarnExtensionServiceBinding} +import org.apache.spark.util.{SystemClock, Utils} +import org.apache.spark.{Logging, SparkContext} + +/** + * A Yarn Extension Service to post lifecycle events to a registered + * YARN Timeline Server. + */ +private[spark] class YarnHistoryService extends YarnExtensionService with Logging { + + import org.apache.spark.deploy.history.yarn.YarnHistoryService._ + + /** Simple state model implemented in an atomic integer */ + private val _serviceState = new AtomicInteger(CreatedState) + + def serviceState: Int = { +_serviceState.get() + } + def enterState(state: Int): Int = { --- End diff -- nit: add empty line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40600885 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import java.net.{ConnectException, URI} +import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, AtomicInteger} --- End diff -- nit: ordering --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1537 publisher-side code and tests
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8744#discussion_r40600846 --- Diff: yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history.yarn + +import org.apache.spark.scheduler._ +import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.{Logging, SparkContext, SparkFirehoseListener} + +/** + * Spark listener which queues up all received events to the [[YarnHistoryService]] passed + * as a constructor. There's no attempt to filter event types at this point. + * + * @param sc context + * @param service service to forward events to + */ +private[spark] class YarnEventListener(sc: SparkContext, service: YarnHistoryService) + extends SparkFirehoseListener with Logging { + + private val clock: Clock = new SystemClock() --- End diff -- Not used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/8880#issuecomment-143862548 Hi @winningsix, I started to review this but the style violations are too distracting; I suggest fixing all those up front before going through a more complete review. If in doubt, refer to: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40599619 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoInputStream.scala --- @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.io.{IOException, InputStream, FilterInputStream} +import java.lang.UnsupportedOperationException +import java.nio.ByteBuffer +import java.nio.channels.ReadableByteChannel +import java.security.GeneralSecurityException +import java.util.Queue +import java.util.concurrent.ConcurrentLinkedQueue + +import com.google.common.base.Preconditions + +/** + * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is + * required in order to ensure that the plain text and cipher text have a 1:1 + * mapping. The decryption is buffer based. The key points of the decryption + * are (1) calculating the counter and (2) padding through stream position: + * + * counter = base + pos/(algorithm blocksize); + * padding = pos%(algorithm blocksize); + * + * The underlying stream offset is maintained as state. + */ +class CryptoInputStream(in: InputStream, codecVal: CryptoCodec, +bufferSizeVal: Integer, keyVal: Array[Byte], ivVal: Array[Byte], +streamOffsetVal: Long) extends FilterInputStream(in: InputStream) with +ReadableByteChannel { + var oneByteBuf: Array[Byte] = new Array[Byte](1) + var codec: CryptoCodec = codecVal + + var bufferSize: Integer = CryptoStreamUtils.checkBufferSize(codecVal, bufferSizeVal) + /** + * Input data buffer. The data starts at inBuffer.position() and ends at + * to inBuffer.limit(). + */ + var inBuffer: ByteBuffer = ByteBuffer.allocateDirect(bufferSizeVal) + + /** + * The decrypted data buffer. The data starts at outBuffer.position() and + * ends at outBuffer.limit() + */ + var outBuffer: ByteBuffer = ByteBuffer.allocateDirect(bufferSizeVal) + var streamOffset: Long = streamOffsetVal // Underlying stream offset. + + /** + * Whether the underlying stream supports + * {@link org.apache.hadoop.fs.ByteBufferReadable} + */ + var usingByteBufferRead: Boolean = false + var usingByteBufferReadInitialized: Boolean = false + /** + * Padding = pos%(algorithm blocksize) Padding is put into {@link #inBuffer} + * before any other data goes in. The purpose of padding is to put the input + * data at proper position. + */ + var padding: Byte = '0' + var closed: Boolean = false + var key: Array[Byte] = keyVal.clone() + var initIV: Array[Byte] = ivVal.clone() + var iv: Array[Byte] = ivVal.clone() + var isReadableByteChannel: Boolean = in.isInstanceOf[ReadableByteChannel] + + /** DirectBuffer pool */ + val bufferPool: Queue[ByteBuffer] = +new ConcurrentLinkedQueue[ByteBuffer]() --- End diff -- nit: fits in the previous line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40599644 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoInputStream.scala --- @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.io.{IOException, InputStream, FilterInputStream} +import java.lang.UnsupportedOperationException +import java.nio.ByteBuffer +import java.nio.channels.ReadableByteChannel +import java.security.GeneralSecurityException +import java.util.Queue +import java.util.concurrent.ConcurrentLinkedQueue + +import com.google.common.base.Preconditions + +/** + * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is + * required in order to ensure that the plain text and cipher text have a 1:1 + * mapping. The decryption is buffer based. The key points of the decryption + * are (1) calculating the counter and (2) padding through stream position: + * + * counter = base + pos/(algorithm blocksize); + * padding = pos%(algorithm blocksize); + * + * The underlying stream offset is maintained as state. + */ +class CryptoInputStream(in: InputStream, codecVal: CryptoCodec, +bufferSizeVal: Integer, keyVal: Array[Byte], ivVal: Array[Byte], +streamOffsetVal: Long) extends FilterInputStream(in: InputStream) with +ReadableByteChannel { + var oneByteBuf: Array[Byte] = new Array[Byte](1) + var codec: CryptoCodec = codecVal + + var bufferSize: Integer = CryptoStreamUtils.checkBufferSize(codecVal, bufferSizeVal) + /** + * Input data buffer. The data starts at inBuffer.position() and ends at + * to inBuffer.limit(). + */ + var inBuffer: ByteBuffer = ByteBuffer.allocateDirect(bufferSizeVal) + + /** + * The decrypted data buffer. The data starts at outBuffer.position() and + * ends at outBuffer.limit() + */ + var outBuffer: ByteBuffer = ByteBuffer.allocateDirect(bufferSizeVal) + var streamOffset: Long = streamOffsetVal // Underlying stream offset. + + /** + * Whether the underlying stream supports + * {@link org.apache.hadoop.fs.ByteBufferReadable} + */ + var usingByteBufferRead: Boolean = false + var usingByteBufferReadInitialized: Boolean = false + /** + * Padding = pos%(algorithm blocksize) Padding is put into {@link #inBuffer} + * before any other data goes in. The purpose of padding is to put the input + * data at proper position. + */ + var padding: Byte = '0' + var closed: Boolean = false + var key: Array[Byte] = keyVal.clone() + var initIV: Array[Byte] = ivVal.clone() + var iv: Array[Byte] = ivVal.clone() + var isReadableByteChannel: Boolean = in.isInstanceOf[ReadableByteChannel] + + /** DirectBuffer pool */ + val bufferPool: Queue[ByteBuffer] = +new ConcurrentLinkedQueue[ByteBuffer]() + /** Decryptor pool */ + val decryptorPool: Queue[Decryptor] = +new ConcurrentLinkedQueue[Decryptor]() + + var tmpBuf: Array[Byte] = null + var decryptor: Decryptor = getDecryptor + CryptoStreamUtils.checkCodec(codecVal) + resetStreamOffset(streamOffset) + + + def this(in: InputStream, codec: CryptoCodec, + bufferSize: Integer, key: Array[Byte], iv: Array[Byte]) { --- End diff -- nit: indentation style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40599555 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoInputStream.scala --- @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.io.{IOException, InputStream, FilterInputStream} +import java.lang.UnsupportedOperationException +import java.nio.ByteBuffer +import java.nio.channels.ReadableByteChannel +import java.security.GeneralSecurityException +import java.util.Queue +import java.util.concurrent.ConcurrentLinkedQueue + +import com.google.common.base.Preconditions + +/** + * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is + * required in order to ensure that the plain text and cipher text have a 1:1 + * mapping. The decryption is buffer based. The key points of the decryption + * are (1) calculating the counter and (2) padding through stream position: + * + * counter = base + pos/(algorithm blocksize); + * padding = pos%(algorithm blocksize); + * + * The underlying stream offset is maintained as state. + */ +class CryptoInputStream(in: InputStream, codecVal: CryptoCodec, +bufferSizeVal: Integer, keyVal: Array[Byte], ivVal: Array[Byte], +streamOffsetVal: Long) extends FilterInputStream(in: InputStream) with +ReadableByteChannel { + var oneByteBuf: Array[Byte] = new Array[Byte](1) + var codec: CryptoCodec = codecVal + + var bufferSize: Integer = CryptoStreamUtils.checkBufferSize(codecVal, bufferSizeVal) --- End diff -- Feels like all these should be `val`s. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40599501 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoInputStream.scala --- @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.io.{IOException, InputStream, FilterInputStream} +import java.lang.UnsupportedOperationException +import java.nio.ByteBuffer +import java.nio.channels.ReadableByteChannel +import java.security.GeneralSecurityException +import java.util.Queue +import java.util.concurrent.ConcurrentLinkedQueue + +import com.google.common.base.Preconditions + +/** + * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is + * required in order to ensure that the plain text and cipher text have a 1:1 + * mapping. The decryption is buffer based. The key points of the decryption + * are (1) calculating the counter and (2) padding through stream position: + * + * counter = base + pos/(algorithm blocksize); + * padding = pos%(algorithm blocksize); + * + * The underlying stream offset is maintained as state. + */ +class CryptoInputStream(in: InputStream, codecVal: CryptoCodec, +bufferSizeVal: Integer, keyVal: Array[Byte], ivVal: Array[Byte], --- End diff -- Indentation doesn't follow Spark style. class Foo(p1: Type, p2: Type, p3: Type) with ReadableByteChannel { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40599401 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoInputStream.scala --- @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.io.{IOException, InputStream, FilterInputStream} +import java.lang.UnsupportedOperationException --- End diff -- No need to import from `java.lang`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40599106 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.security.NoSuchAlgorithmException +import javax.crypto.{KeyGenerator, SecretKey} + +import org.apache.hadoop.security.Credentials + +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.SparkConf + +/** + * CryptoConf is a class for Crypto configuration + */ +private[spark] case class CryptoConf(enabled: Boolean = false) { + +} + +private[spark] object CryptoConf { + def parse(sparkConf: SparkConf): CryptoConf = { +val enabled = if (sparkConf != null) { + sparkConf.getBoolean("spark.encrypted.shuffle", false) +} else { + false +} +new CryptoConf(enabled) + } + + def initSparkShuffleCredentials(conf: SparkConf, credentials: Credentials) { +if (credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) == null) { + var keyGen: KeyGenerator = null + try { +val SHUFFLE_KEY_LENGTH: Int = 64 +var keyLen: Int = if (conf.getBoolean(SPARK_ENCRYPTED_INTERMEDIATE_DATA, + DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA) == true) { + conf.getInt(SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, +DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) +} else { + SHUFFLE_KEY_LENGTH +} +val SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; +keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM) +keyGen.init(keyLen) + } catch { +case e: NoSuchAlgorithmException => throw new RuntimeException("Error generating " + --- End diff -- It would be nice to at least stash the original exception as the cause of the one you're throwing. `RuntimeException` is also a very generic exception, it would be better to have a more specific one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40599017 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.security.NoSuchAlgorithmException +import javax.crypto.{KeyGenerator, SecretKey} + +import org.apache.hadoop.security.Credentials + +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.SparkConf + +/** + * CryptoConf is a class for Crypto configuration + */ +private[spark] case class CryptoConf(enabled: Boolean = false) { + +} + +private[spark] object CryptoConf { + def parse(sparkConf: SparkConf): CryptoConf = { +val enabled = if (sparkConf != null) { + sparkConf.getBoolean("spark.encrypted.shuffle", false) +} else { + false +} +new CryptoConf(enabled) + } + + def initSparkShuffleCredentials(conf: SparkConf, credentials: Credentials) { +if (credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) == null) { + var keyGen: KeyGenerator = null + try { +val SHUFFLE_KEY_LENGTH: Int = 64 +var keyLen: Int = if (conf.getBoolean(SPARK_ENCRYPTED_INTERMEDIATE_DATA, + DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA) == true) { + conf.getInt(SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, +DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) +} else { + SHUFFLE_KEY_LENGTH +} +val SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; --- End diff -- Config option for this maybe? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40598980 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.security.NoSuchAlgorithmException +import javax.crypto.{KeyGenerator, SecretKey} + +import org.apache.hadoop.security.Credentials + +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.SparkConf + +/** + * CryptoConf is a class for Crypto configuration + */ +private[spark] case class CryptoConf(enabled: Boolean = false) { + +} + +private[spark] object CryptoConf { + def parse(sparkConf: SparkConf): CryptoConf = { +val enabled = if (sparkConf != null) { + sparkConf.getBoolean("spark.encrypted.shuffle", false) +} else { + false +} +new CryptoConf(enabled) + } + + def initSparkShuffleCredentials(conf: SparkConf, credentials: Credentials) { +if (credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) == null) { + var keyGen: KeyGenerator = null + try { +val SHUFFLE_KEY_LENGTH: Int = 64 --- End diff -- This feels like something that would need a config option? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40598890 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.security.NoSuchAlgorithmException +import javax.crypto.{KeyGenerator, SecretKey} + +import org.apache.hadoop.security.Credentials + +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.SparkConf + +/** + * CryptoConf is a class for Crypto configuration + */ +private[spark] case class CryptoConf(enabled: Boolean = false) { --- End diff -- Feels a little overkill to have a class dedicated to holding a single boolean value. Is there more config information that you can put here, maybe consolidating some of the other config stuff in this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40598481 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.security.NoSuchAlgorithmException +import javax.crypto.{KeyGenerator, SecretKey} + +import org.apache.hadoop.security.Credentials + +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.SparkConf --- End diff -- nit: ordering, should come before previous line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/8908#discussion_r40598461 --- Diff: R/pkg/R/DataFrame.R --- @@ -1848,3 +1848,28 @@ setMethod("crosstab", sct <- callJMethod(statFunctions, "crosstab", col1, col2) collect(dataFrame(sct)) }) + + +#' This function downloads the contents of a DataFrame into an R's data.frame. +#' Since data.frames are held in memory, ensure that you have enough memory +#' in your system to accommodate the contents. +#' +#' @title Download data from a DataFrame into a data.frame +#' @param x a DataFrame +#' @return a data.frame +#' @rdname as.data.frame +#' @examples \dontrun{ +#' +#' irisDF <- createDataFrame(sqlContext, iris) +#' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ]) +#' } +setMethod(f = "as.data.frame", --- End diff -- sorry a couple of more style nits. Based on existing code, this should be ``` setMethod("as.data.frame", signature(x = "DataFrame"), ``` (i.e. having `x = ` in the signature line and no `f=` in the name line) Again, just to restate this isn't for functionality but just for matching the style with all the existing code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40598429 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { + /** + * + * @return the CipherSuite for this codec. + */ + def getCipherSuite(): CipherSuite + + /** + * This interface is only for Counter (CTR) mode. Generally the Encryptor + * or Decryptor calculates the IV and maintain encryption context internally. + * For example a {@link javax.crypto.Cipher} will maintain its encryption + * context internally when we do encryption/decryption using the + * Cipher#update interface. + * + * The IV can be calculated by combining the initial IV and the counter with + * a lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * @param initIV + * @param counter + * @param IV + */ + def calculateIV(initIV: Array[Byte], counter: Long, IV: Array[Byte]) + + /** + * @return Encryptor the encryptor + */ + def createEncryptor: Encryptor + + /** + * @return Decryptor the decryptor + */ + def createDecryptor: Decryptor + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * This method needs to be thread-safe. + * @param bytes byte array to populate with random data + */ + def generateSecureRandom(bytes: Array[Byte]) +} + +object CryptoCodec extends Logging { + def getInstance(conf: SparkConf): CryptoCodec = { +var name: String = conf.get(SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY, + SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT) +getInstance(conf, CipherSuite.convert(name)) + } + + def getInstance(conf: SparkConf, cipherSuite: CipherSuite): CryptoCodec = { +var klasses: List[String] = getCodecClasses(conf, cipherSuite) +var codec: CryptoCodec = null +for (klass <- klasses) { + try { +val m = universe.runtimeMirror(getClass.getClassLoader) +var c: CryptoCodec = null +if (klass.equals("org.apache.spark.crypto.JceAesCtrCryptoCodec")) { + val classCryptoCodec = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec] +.typeSymbol.asClass + val cm = m.reflectClass(classCryptoCodec) + val ctor = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec].declaration( +universe.nme.CONSTRUCTOR).asMethod + val ctorm = cm.reflectConstructor(ctor) + val p = ctorm(conf) + c = p.asInstanceOf[org.apache.spark.crypto.CryptoCodec] +} else { + // TODO add Openssl crypto codec +} + +if (c.getCipherSuite.name.equals(cipherSuite.name)) { + if (codec == null) { +logDebug(s"Using crypto codec $klass.getName.") +codec = c + } +} +else { + logDebug(s"Crypto codec $klass.getName doesn't meet the cipher suite $cipherSuite" + +s".getName.") +} + } + catch { +case e: Exception => { + logDebug(s"Crypto
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40598211 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { + /** + * + * @return the CipherSuite for this codec. + */ + def getCipherSuite(): CipherSuite + + /** + * This interface is only for Counter (CTR) mode. Generally the Encryptor + * or Decryptor calculates the IV and maintain encryption context internally. + * For example a {@link javax.crypto.Cipher} will maintain its encryption + * context internally when we do encryption/decryption using the + * Cipher#update interface. + * + * The IV can be calculated by combining the initial IV and the counter with + * a lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * @param initIV + * @param counter + * @param IV + */ + def calculateIV(initIV: Array[Byte], counter: Long, IV: Array[Byte]) + + /** + * @return Encryptor the encryptor + */ + def createEncryptor: Encryptor + + /** + * @return Decryptor the decryptor + */ + def createDecryptor: Decryptor + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * This method needs to be thread-safe. + * @param bytes byte array to populate with random data + */ + def generateSecureRandom(bytes: Array[Byte]) +} + +object CryptoCodec extends Logging { + def getInstance(conf: SparkConf): CryptoCodec = { +var name: String = conf.get(SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY, + SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT) +getInstance(conf, CipherSuite.convert(name)) + } + + def getInstance(conf: SparkConf, cipherSuite: CipherSuite): CryptoCodec = { +var klasses: List[String] = getCodecClasses(conf, cipherSuite) +var codec: CryptoCodec = null +for (klass <- klasses) { + try { +val m = universe.runtimeMirror(getClass.getClassLoader) +var c: CryptoCodec = null +if (klass.equals("org.apache.spark.crypto.JceAesCtrCryptoCodec")) { + val classCryptoCodec = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec] +.typeSymbol.asClass + val cm = m.reflectClass(classCryptoCodec) + val ctor = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec].declaration( +universe.nme.CONSTRUCTOR).asMethod + val ctorm = cm.reflectConstructor(ctor) + val p = ctorm(conf) + c = p.asInstanceOf[org.apache.spark.crypto.CryptoCodec] +} else { + // TODO add Openssl crypto codec +} + +if (c.getCipherSuite.name.equals(cipherSuite.name)) { + if (codec == null) { --- End diff -- If I understand the semantics of this method, you could achieve this more cleanly by using `klasses.view.collectFirst`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597474 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { + /** + * + * @return the CipherSuite for this codec. + */ + def getCipherSuite(): CipherSuite + + /** + * This interface is only for Counter (CTR) mode. Generally the Encryptor + * or Decryptor calculates the IV and maintain encryption context internally. + * For example a {@link javax.crypto.Cipher} will maintain its encryption + * context internally when we do encryption/decryption using the + * Cipher#update interface. + * + * The IV can be calculated by combining the initial IV and the counter with + * a lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * @param initIV + * @param counter + * @param IV + */ + def calculateIV(initIV: Array[Byte], counter: Long, IV: Array[Byte]) + + /** + * @return Encryptor the encryptor + */ + def createEncryptor: Encryptor + + /** + * @return Decryptor the decryptor + */ + def createDecryptor: Decryptor + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * This method needs to be thread-safe. + * @param bytes byte array to populate with random data + */ + def generateSecureRandom(bytes: Array[Byte]) +} + +object CryptoCodec extends Logging { + def getInstance(conf: SparkConf): CryptoCodec = { +var name: String = conf.get(SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY, + SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT) +getInstance(conf, CipherSuite.convert(name)) + } + + def getInstance(conf: SparkConf, cipherSuite: CipherSuite): CryptoCodec = { +var klasses: List[String] = getCodecClasses(conf, cipherSuite) +var codec: CryptoCodec = null +for (klass <- klasses) { + try { +val m = universe.runtimeMirror(getClass.getClassLoader) +var c: CryptoCodec = null +if (klass.equals("org.apache.spark.crypto.JceAesCtrCryptoCodec")) { + val classCryptoCodec = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec] +.typeSymbol.asClass + val cm = m.reflectClass(classCryptoCodec) + val ctor = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec].declaration( +universe.nme.CONSTRUCTOR).asMethod + val ctorm = cm.reflectConstructor(ctor) + val p = ctorm(conf) + c = p.asInstanceOf[org.apache.spark.crypto.CryptoCodec] +} else { + // TODO add Openssl crypto codec --- End diff -- Should throw an exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597512 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { + /** + * + * @return the CipherSuite for this codec. + */ + def getCipherSuite(): CipherSuite + + /** + * This interface is only for Counter (CTR) mode. Generally the Encryptor + * or Decryptor calculates the IV and maintain encryption context internally. + * For example a {@link javax.crypto.Cipher} will maintain its encryption + * context internally when we do encryption/decryption using the + * Cipher#update interface. + * + * The IV can be calculated by combining the initial IV and the counter with + * a lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * @param initIV + * @param counter + * @param IV + */ + def calculateIV(initIV: Array[Byte], counter: Long, IV: Array[Byte]) + + /** + * @return Encryptor the encryptor + */ + def createEncryptor: Encryptor + + /** + * @return Decryptor the decryptor + */ + def createDecryptor: Decryptor + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * This method needs to be thread-safe. + * @param bytes byte array to populate with random data + */ + def generateSecureRandom(bytes: Array[Byte]) +} + +object CryptoCodec extends Logging { + def getInstance(conf: SparkConf): CryptoCodec = { +var name: String = conf.get(SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY, + SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT) +getInstance(conf, CipherSuite.convert(name)) + } + + def getInstance(conf: SparkConf, cipherSuite: CipherSuite): CryptoCodec = { +var klasses: List[String] = getCodecClasses(conf, cipherSuite) +var codec: CryptoCodec = null +for (klass <- klasses) { + try { +val m = universe.runtimeMirror(getClass.getClassLoader) +var c: CryptoCodec = null +if (klass.equals("org.apache.spark.crypto.JceAesCtrCryptoCodec")) { --- End diff -- Class in in Spark, you can use `classOf[JceAesCtrCryptoCodec].getName()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597441 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { + /** + * + * @return the CipherSuite for this codec. + */ + def getCipherSuite(): CipherSuite + + /** + * This interface is only for Counter (CTR) mode. Generally the Encryptor + * or Decryptor calculates the IV and maintain encryption context internally. + * For example a {@link javax.crypto.Cipher} will maintain its encryption + * context internally when we do encryption/decryption using the + * Cipher#update interface. + * + * The IV can be calculated by combining the initial IV and the counter with + * a lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * @param initIV + * @param counter + * @param IV + */ + def calculateIV(initIV: Array[Byte], counter: Long, IV: Array[Byte]) + + /** + * @return Encryptor the encryptor + */ + def createEncryptor: Encryptor + + /** + * @return Decryptor the decryptor + */ + def createDecryptor: Decryptor + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * This method needs to be thread-safe. + * @param bytes byte array to populate with random data + */ + def generateSecureRandom(bytes: Array[Byte]) +} + +object CryptoCodec extends Logging { + def getInstance(conf: SparkConf): CryptoCodec = { +var name: String = conf.get(SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY, + SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT) +getInstance(conf, CipherSuite.convert(name)) + } + + def getInstance(conf: SparkConf, cipherSuite: CipherSuite): CryptoCodec = { +var klasses: List[String] = getCodecClasses(conf, cipherSuite) +var codec: CryptoCodec = null +for (klass <- klasses) { + try { +val m = universe.runtimeMirror(getClass.getClassLoader) +var c: CryptoCodec = null +if (klass.equals("org.apache.spark.crypto.JceAesCtrCryptoCodec")) { + val classCryptoCodec = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec] --- End diff -- Why do you need to use reflection here? Why can't you just instantiate the class directly, since it's part of Spark already? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597245 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { + /** + * + * @return the CipherSuite for this codec. + */ + def getCipherSuite(): CipherSuite + + /** + * This interface is only for Counter (CTR) mode. Generally the Encryptor + * or Decryptor calculates the IV and maintain encryption context internally. + * For example a {@link javax.crypto.Cipher} will maintain its encryption + * context internally when we do encryption/decryption using the + * Cipher#update interface. + * + * The IV can be calculated by combining the initial IV and the counter with + * a lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * @param initIV + * @param counter + * @param IV + */ + def calculateIV(initIV: Array[Byte], counter: Long, IV: Array[Byte]) + + /** + * @return Encryptor the encryptor + */ + def createEncryptor: Encryptor + + /** + * @return Decryptor the decryptor + */ + def createDecryptor: Decryptor + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * This method needs to be thread-safe. + * @param bytes byte array to populate with random data + */ + def generateSecureRandom(bytes: Array[Byte]) +} + +object CryptoCodec extends Logging { + def getInstance(conf: SparkConf): CryptoCodec = { +var name: String = conf.get(SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY, + SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT) +getInstance(conf, CipherSuite.convert(name)) + } + + def getInstance(conf: SparkConf, cipherSuite: CipherSuite): CryptoCodec = { +var klasses: List[String] = getCodecClasses(conf, cipherSuite) --- End diff -- `val`, no need to specify the type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597300 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { + /** + * + * @return the CipherSuite for this codec. + */ + def getCipherSuite(): CipherSuite + + /** + * This interface is only for Counter (CTR) mode. Generally the Encryptor + * or Decryptor calculates the IV and maintain encryption context internally. + * For example a {@link javax.crypto.Cipher} will maintain its encryption + * context internally when we do encryption/decryption using the + * Cipher#update interface. + * + * The IV can be calculated by combining the initial IV and the counter with + * a lossless operation (concatenation, addition, or XOR). + * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29 + * @param initIV + * @param counter + * @param IV + */ + def calculateIV(initIV: Array[Byte], counter: Long, IV: Array[Byte]) + + /** + * @return Encryptor the encryptor + */ + def createEncryptor: Encryptor + + /** + * @return Decryptor the decryptor + */ + def createDecryptor: Decryptor + + /** + * Generate a number of secure, random bytes suitable for cryptographic use. + * This method needs to be thread-safe. + * @param bytes byte array to populate with random data + */ + def generateSecureRandom(bytes: Array[Byte]) +} + +object CryptoCodec extends Logging { + def getInstance(conf: SparkConf): CryptoCodec = { +var name: String = conf.get(SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY, + SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT) +getInstance(conf, CipherSuite.convert(name)) + } + + def getInstance(conf: SparkConf, cipherSuite: CipherSuite): CryptoCodec = { +var klasses: List[String] = getCodecClasses(conf, cipherSuite) +var codec: CryptoCodec = null +for (klass <- klasses) { + try { +val m = universe.runtimeMirror(getClass.getClassLoader) +var c: CryptoCodec = null +if (klass.equals("org.apache.spark.crypto.JceAesCtrCryptoCodec")) { + val classCryptoCodec = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec] +.typeSymbol.asClass + val cm = m.reflectClass(classCryptoCodec) + val ctor = universe.typeOf[org.apache.spark.crypto.JceAesCtrCryptoCodec].declaration( +universe.nme.CONSTRUCTOR).asMethod + val ctorm = cm.reflectConstructor(ctor) + val p = ctorm(conf) + c = p.asInstanceOf[org.apache.spark.crypto.CryptoCodec] +} else { + // TODO add Openssl crypto codec +} + +if (c.getCipherSuite.name.equals(cipherSuite.name)) { + if (codec == null) { +logDebug(s"Using crypto codec $klass.getName.") +codec = c + } +} +else { --- End diff -- nit: should go in previous line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597121 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} + +/** + * Crypto codec class, encapsulates encryptor/decryptor pair. + */ +abstract case class CryptoCodec() { --- End diff -- Does this need to be a case class? It doesn't have any fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597034 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException --- End diff -- No need to import `java.lang`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40597016 --- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoCodec.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.lang.ClassCastException + +import scala.reflect.runtime.universe + +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_DEFAULT +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CIPHER_SUITE_KEY +import org.apache.spark.crypto.CommonConfigurationKeys.SPARK_SECURITY_CRYPTO_CODEC_CLASSES_AES_CTR_NOPADDING_KEY +import org.apache.spark.{SparkConf, Logging} --- End diff -- nit: import order. This line should come before the others, and `Logging` before `SparkConf`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40596818 --- Diff: core/src/main/scala/org/apache/spark/crypto/CipherSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +/** + * Defines properties of a CipherSuite. Modeled after the ciphers in + * {@link javax.crypto.Cipher}. + * @param namename of cipher suite, as in {@link javax.crypto.Cipher} + * @param algoBlockSize size of an algorithm block in bytes + */ +case class CipherSuite(name: String, algoBlockSize: Int) { + var unknownValue: Integer = null + + def setUnknownValue(unknown: Integer) { +this.unknownValue = unknown + } + + def getUnknownValue(): Integer = unknownValue + + + override def toString(): String = { +var builder: StringBuilder = new StringBuilder("{") +builder.append("name: " + name) +builder.append(", algorithmBlockSize: " + algoBlockSize) +if (unknownValue != null) { + builder.append(", unknownValue: " + unknownValue) +} +builder.append("}") +builder.toString() + } + + /** + * Returns suffix of cipher suite configuration. + * @return String configuration suffix + */ + def getConfigSuffix(): String = { +val parts = name.split("/") +var suffix: StringBuilder = new StringBuilder() +parts.foreach(part => + suffix.append(".").append(part.toLowerCase()) +) +suffix.toString() + } +} + + +object UNKNOWN extends CipherSuite("Unknown", 0) + +object AES_CTR_NOPADDING extends CipherSuite("AES/CTR/NoPadding", 16) + +object CipherSuite { + /** + * Convert to CipherSuite from name, {@link #algoBlockSize} is fixed for + * certain cipher suite, just need to compare the name. + * @param name cipher suite name + * @return CipherSuite cipher suite + */ + def convert(name: String): CipherSuite = { +if (name.equals(AES_CTR_NOPADDING.name)) { --- End diff -- use `name match { ... }` instead of cascading ifs. Also, if you call this method `apply`, you can just say `CipherSuite("foo")` instead of `CipherSuite.convert("foo")`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40596446 --- Diff: core/src/main/scala/org/apache/spark/crypto/CipherSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +/** + * Defines properties of a CipherSuite. Modeled after the ciphers in + * {@link javax.crypto.Cipher}. + * @param namename of cipher suite, as in {@link javax.crypto.Cipher} + * @param algoBlockSize size of an algorithm block in bytes + */ +case class CipherSuite(name: String, algoBlockSize: Int) { + var unknownValue: Integer = null + + def setUnknownValue(unknown: Integer) { +this.unknownValue = unknown + } + + def getUnknownValue(): Integer = unknownValue + + + override def toString(): String = { +var builder: StringBuilder = new StringBuilder("{") +builder.append("name: " + name) +builder.append(", algorithmBlockSize: " + algoBlockSize) +if (unknownValue != null) { + builder.append(", unknownValue: " + unknownValue) +} +builder.append("}") +builder.toString() + } + + /** + * Returns suffix of cipher suite configuration. + * @return String configuration suffix + */ + def getConfigSuffix(): String = { +val parts = name.split("/") --- End diff -- This whole method can be written as: "." + name.split("/").map(_.toLowerCase()).mkString(".") (Assuming `name` cannot be empty.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40596195 --- Diff: core/src/main/scala/org/apache/spark/crypto/CipherSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +/** + * Defines properties of a CipherSuite. Modeled after the ciphers in + * {@link javax.crypto.Cipher}. + * @param namename of cipher suite, as in {@link javax.crypto.Cipher} + * @param algoBlockSize size of an algorithm block in bytes + */ +case class CipherSuite(name: String, algoBlockSize: Int) { + var unknownValue: Integer = null + + def setUnknownValue(unknown: Integer) { +this.unknownValue = unknown + } + + def getUnknownValue(): Integer = unknownValue + + + override def toString(): String = { +var builder: StringBuilder = new StringBuilder("{") +builder.append("name: " + name) +builder.append(", algorithmBlockSize: " + algoBlockSize) +if (unknownValue != null) { + builder.append(", unknownValue: " + unknownValue) +} +builder.append("}") +builder.toString() + } + + /** + * Returns suffix of cipher suite configuration. + * @return String configuration suffix + */ + def getConfigSuffix(): String = { +val parts = name.split("/") +var suffix: StringBuilder = new StringBuilder() --- End diff -- nit: no need to specify the variable's type in these vases. This should also be a `val`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40596221 --- Diff: core/src/main/scala/org/apache/spark/crypto/CipherSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +/** + * Defines properties of a CipherSuite. Modeled after the ciphers in + * {@link javax.crypto.Cipher}. + * @param namename of cipher suite, as in {@link javax.crypto.Cipher} + * @param algoBlockSize size of an algorithm block in bytes + */ +case class CipherSuite(name: String, algoBlockSize: Int) { + var unknownValue: Integer = null + + def setUnknownValue(unknown: Integer) { +this.unknownValue = unknown + } + + def getUnknownValue(): Integer = unknownValue + + + override def toString(): String = { +var builder: StringBuilder = new StringBuilder("{") +builder.append("name: " + name) +builder.append(", algorithmBlockSize: " + algoBlockSize) +if (unknownValue != null) { + builder.append(", unknownValue: " + unknownValue) +} +builder.append("}") +builder.toString() + } + + /** + * Returns suffix of cipher suite configuration. + * @return String configuration suffix + */ + def getConfigSuffix(): String = { +val parts = name.split("/") +var suffix: StringBuilder = new StringBuilder() +parts.foreach(part => --- End diff -- nit: `.foreach { part =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40596080 --- Diff: core/src/main/scala/org/apache/spark/crypto/CipherSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +/** + * Defines properties of a CipherSuite. Modeled after the ciphers in + * {@link javax.crypto.Cipher}. + * @param namename of cipher suite, as in {@link javax.crypto.Cipher} + * @param algoBlockSize size of an algorithm block in bytes + */ +case class CipherSuite(name: String, algoBlockSize: Int) { + var unknownValue: Integer = null + + def setUnknownValue(unknown: Integer) { +this.unknownValue = unknown + } + + def getUnknownValue(): Integer = unknownValue + + + override def toString(): String = { +var builder: StringBuilder = new StringBuilder("{") --- End diff -- You could use Guava's `Objects.toStringHelper()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40595862 --- Diff: core/src/main/scala/org/apache/spark/crypto/AesCtrCryptoCodec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import com.google.common.base.Preconditions +import scala.Byte + +/** + * For AES, the algorithm block is fixed size of 128 bits. + * @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard + */ +abstract class AesCtrCryptoCodec extends CryptoCodec { + val CTR_OFFSET: Integer = 8 + val SUITE: CipherSuite = AES_CTR_NOPADDING + val AES_BLOCK_SIZE: Integer = SUITE.algoBlockSize + + override def getCipherSuite(): CipherSuite = { +SUITE + } + + override def calculateIV(initIV: Array[Byte], counterVal: Long, IV: Array[Byte]): Unit = { --- End diff -- Can you add a scaladoc describing what this method is doing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40595780 --- Diff: core/src/main/scala/org/apache/spark/crypto/AesCtrCryptoCodec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import com.google.common.base.Preconditions +import scala.Byte + +/** + * For AES, the algorithm block is fixed size of 128 bits. + * @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard + */ +abstract class AesCtrCryptoCodec extends CryptoCodec { + val CTR_OFFSET: Integer = 8 + val SUITE: CipherSuite = AES_CTR_NOPADDING + val AES_BLOCK_SIZE: Integer = SUITE.algoBlockSize + + override def getCipherSuite(): CipherSuite = { +SUITE + } + + override def calculateIV(initIV: Array[Byte], counterVal: Long, IV: Array[Byte]): Unit = { --- End diff -- nit: don't use capital letters only for parameter names (`IV`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40595711 --- Diff: core/src/main/scala/org/apache/spark/crypto/AesCtrCryptoCodec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import com.google.common.base.Preconditions +import scala.Byte + +/** + * For AES, the algorithm block is fixed size of 128 bits. + * @see http://en.wikipedia.org/wiki/Advanced_Encryption_Standard + */ +abstract class AesCtrCryptoCodec extends CryptoCodec { --- End diff -- All these classes should be `private[spark]`, at least. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10771: Implement the shuffle encryption ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/8880#discussion_r40595673 --- Diff: core/src/main/scala/org/apache/spark/crypto/AesCtrCryptoCodec.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import com.google.common.base.Preconditions +import scala.Byte --- End diff -- no need to import `Byte` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10859] [SQL] fix stats of StringType in...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8929#issuecomment-143852249 [Test build #43070 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43070/consoleFull) for PR 8929 at commit [`0ea9d09`](https://github.com/apache/spark/commit/0ea9d09a72dd11136e82aec7b7246da64ac2d81e). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10859] [SQL] fix stats of StringType in...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8929#issuecomment-143850579 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10859] [SQL] fix stats of StringType in...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8929#issuecomment-143850517 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143849927 [Test build #43069 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43069/consoleFull) for PR 8908 at commit [`de6d164`](https://github.com/apache/spark/commit/de6d164c0c882143f949cc811023974bdb798507). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user olarayej commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143849747 @shivaram Thanks a lot for your comments! I have addressed them. -Oscar --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10859] [SQL] fix stats of StringType in...
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/8929 [SPARK-10859] [SQL] fix stats of StringType in columnar cache The UTF8String may come from UnsafeRow, then underline buffer of it is not copied, so we should clone it in order to hold it in Stats. cc @yhuai You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark pushdown_string Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8929.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8929 commit 0ea9d09a72dd11136e82aec7b7246da64ac2d81e Author: Davies Liu Date: 2015-09-28T19:20:53Z fix stats of StringType in columnar cache --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9384] [core] Easier setting of executor...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/7739#issuecomment-143849420 Looks sane but there's some room for cleanup; also, need to resolve conflicts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9384] [core] Easier setting of executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/7739#discussion_r40593778 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -143,6 +144,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .orElse(sparkProperties.get("spark.master")) .orElse(env.get("MASTER")) .orNull +commonExtraClassPath = Option(commonExtraClassPath) --- End diff -- The change in this file is not technically necessary, unless you add a specific command line option for this config (which I don't think you should). See how it doesn't have a field for the executor's extra class path (because there's no command line option for it). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143849225 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR] Added as.data.frame as a...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143849259 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9384] [core] Easier setting of executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/7739#discussion_r40593517 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -624,3 +639,15 @@ object UserClasspathFirstTest { } } } + +object CommonClasspathAddedTest { + def main(args: Array[String]) { +val ccl = Thread.currentThread().getContextClassLoader() +val resource = ccl.getResourceAsStream("test.resource") +val bytes = ByteStreams.toByteArray(resource) +val contents = new String(bytes, 0, bytes.length, UTF_8) +if (contents != "COMMON") { --- End diff -- minor, but you could just use `assert` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9384] [core] Easier setting of executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/7739#discussion_r40593338 --- Diff: core/src/main/scala/org/apache/spark/deploy/Client.scala --- @@ -67,8 +67,15 @@ private class ClientEndpoint( // people call `addJar` assuming the jar is in the same directory. val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" -val classPathConf = "spark.driver.extraClassPath" -val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => +val driverClassPathConf = "spark.driver.extraClassPath" +val commonClassPathConf = "spark.common.extraClassPath" + +val driverClassPath = Some( + (sys.props.get(commonClassPathConf) ++ sys.props.get(driverClassPathConf)) --- End diff -- Not your fault; but it's slightly weird that this code is using `sys.props` instead of the `SparkConf` object provided in the constructor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9384] [core] Easier setting of executor...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/7739#discussion_r40593175 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -505,6 +510,23 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getAll.sorted.map{case (k, v) => k + "=" + v}.mkString("\n") } + def getDriverExtraClassPath() : Option[String] = { +// Concatenate common and driver classpath in that order +val commonExtraClassPath = getOption("spark.common.extraClassPath") +val driverExtraClassPath = getOption("spark.driver.extraClassPath") +Some((commonExtraClassPath ++ driverExtraClassPath).mkString(File.pathSeparator)) + .filter(_.nonEmpty) --- End diff -- To be pedantic, the filtering should be done earlier, otherwise: scala> Seq("", "").mkString(":") res0: String = : --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8673] [launcher] API and infrastructure...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7052#issuecomment-143847775 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43066/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8673] [launcher] API and infrastructure...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7052#issuecomment-143847771 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8673] [launcher] API and infrastructure...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7052#issuecomment-143847626 [Test build #43066 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43066/console) for PR 7052 at commit [`a7e4d17`](https://github.com/apache/spark/commit/a7e4d17cf34ea5977f27456d0b8d7fc954463823). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class ChildProcAppHandle implements SparkAppHandle ` * `abstract class LauncherConnection implements Closeable, Runnable ` * `final class LauncherProtocol ` * ` static class Message implements Serializable ` * ` static class Hello extends Message ` * ` static class SetAppId extends Message ` * ` static class SetState extends Message ` * ` static class Stop extends Message ` * `class LauncherServer implements Closeable ` * `class NamedThreadFactory implements ThreadFactory ` * `class OutputRedirector ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10622] [core] Differentiate dead from "...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/8887#issuecomment-143843978 The concept looks sound to me, however I'm actually less familiar with the logic of where tasks get allocated. Someone else should also review! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143843129 [Test build #43068 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43068/console) for PR 8374 at commit [`27f118b`](https://github.com/apache/spark/commit/27f118be2b6c4bf1ce34b40d79691f17856e0bba). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143843204 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143843209 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43068/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10833] [BUILD] Inline, organize BSD/MIT...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/8919#issuecomment-143837144 Hey Sean - looks good to me, but can't claim to be nearly as deep as you are on this stuff! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10622] [core] Differentiate dead from "...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/8887#discussion_r40588094 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -362,8 +395,8 @@ private[spark] class TaskSchedulerImpl( } // Update the DAGScheduler without holding a lock on this, since that can deadlock if (failedExecutor.isDefined) { --- End diff -- Could use failedExecutor.forEach syntactic sugar here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143836713 [Test build #43068 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43068/consoleFull) for PR 8374 at commit [`27f118b`](https://github.com/apache/spark/commit/27f118be2b6c4bf1ce34b40d79691f17856e0bba). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10807][SPARKR]
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/8908#issuecomment-143835492 @olarayej Sorry I wasn't clear the last time around but the title should be `[SPARK-10807][SPARKR] Added as.data.frame as a synonym for collect` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143834967 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143834992 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10622] [core] Differentiate dead from "...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/8887#issuecomment-143832319 can you add `[YARN]` to the title? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10101] [SQL] Add maxlength to JDBC fiel...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/8374#issuecomment-143832037 Just to let you know, we are busy wrapping up 1.5.1, but I have put reviewing this PR on our schedule for the next 2 week sprint. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10752][SPARKR] Implement corr() and cov...
Github user NarineK commented on a diff in the pull request: https://github.com/apache/spark/pull/8869#discussion_r40581218 --- Diff: R/pkg/R/DataFrameStatFunctions.R --- @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# DataFrameStatFunctions.R - Statistic functions for DataFrames. + +setOldClass("jobj") + +#' crosstab +#' +#' Computes a pair-wise frequency table of the given columns. Also known as a contingency +#' table. The number of distinct values for each column should be less than 1e4. At most 1e6 +#' non-zero pair frequencies will be returned. +#' +#' @param col1 name of the first column. Distinct items will make the first item of each row. +#' @param col2 name of the second column. Distinct items will make the column names of the output. +#' @return a local R data.frame representing the contingency table. The first column of each row +#' will be the distinct values of `col1` and the column names will be the distinct values +#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no +#' occurrences will have zero as their counts. +#' +#' @rdname statfunctions +#' @name crosstab +#' @export +#' @examples +#' \dontrun{ +#' df <- jsonFile(sqlCtx, "/path/to/file.json") +#' ct <- crosstab(df, "title", "gender") +#' } +setMethod("crosstab", + signature(x = "DataFrame", col1 = "character", col2 = "character"), + function(x, col1, col2) { +statFunctions <- callJMethod(x@sdf, "stat") +sct <- callJMethod(statFunctions, "crosstab", col1, col2) +collect(dataFrame(sct)) + }) + +#' cov +#' +#' Calculate the sample covariance of two numerical columns of a DataFrame. +#' +#' @param x A SparkSQL DataFrame +#' @param col1 the name of the first column +#' @param col2 the name of the second column +#' @return the covariance of the two columns. +#' +#' @rdname statfunctions +#' @name cov +#' @export +#' @examples +#'\dontrun{ +#' df <- jsonFile(sqlCtx, "/path/to/file.json") +#' cov <- cov(df, "title", "gender") +#' } +setMethod("cov", + signature(x = "DataFrame", col1 = "character", col2 = "character"), --- End diff -- Hi there, I have some points about correlation and covariance. 1. R calls the method 'cor' and not 'corr', so if we want to have the same syntax as R, we might want to use the 'cor'. 2. The actual syntax for cor (cov has a similar one) is : cor(x, y = NULL, use = "everything", method = c("pearson", "kendall", "spearman")) where X is a dataframe and y can be another dataframe, a vector or matrix and in R I can get smth like this: cor(longley) GNP.deflator GNP Unemployed GNP.deflator1.000 0.9915892 GNP 0.9915892 1.000 Unemployed 0.6206334 0.6042609 Armed.Forces0.4647442 0.4464368 Population 0.9791634 0.9910901 Year0.9911492 0.9952735 Employed0.9708985 0.9835516 I wonder if we can get this in SparkR too. I see at least 2 options here: 1. we make K number of calls to dataframe api for each column pair or 2. we extend scala dataframe api so that it also accepts a list of columns ... I can help you with this if you think that it makes sense and we want to add it. Thanks, Narine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10079][SPARKR] Make 'column' and 'col' ...
Github user shivaram commented on a diff in the pull request: https://github.com/apache/spark/pull/8864#discussion_r40580851 --- Diff: R/pkg/R/DataFrame.R --- @@ -946,15 +946,36 @@ setMethod("foreachPartition", ## SELECT ## -getColumn <- function(x, c) { - column(callJMethod(x@sdf, "col", c)) -} +#' Select a column +#' +#' Selects column based on the column name and return it as a Column. +#' Note that the column name can also reference to a nested column like `a.b`. +#' +#' @param x A SparkSQL DataFrame +#' +#' @rdname select +#' @name col +#' @aliases col +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' df <- createDataFrame(sqlContext, list(50, 60, 70), "age") +#' col(df, "age") +#'} +setMethod("col", --- End diff -- Yeah but in terms of exporting this I'm not sure we need another method for the users ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10836] [SparkR] Added sort(x, decreasin...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8920#issuecomment-143813691 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43067/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10836] [SparkR] Added sort(x, decreasin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8920#issuecomment-143813677 [Test build #43067 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43067/console) for PR 8920 at commit [`202d231`](https://github.com/apache/spark/commit/202d23164993bd3e62e987fd757aa8dbeb85882b). * This patch **fails R style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10836] [SparkR] Added sort(x, decreasin...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8920#issuecomment-143813685 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10836] [SparkR] Added sort(x, decreasin...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/8920#issuecomment-143813135 [Test build #43067 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43067/consoleFull) for PR 8920 at commit [`202d231`](https://github.com/apache/spark/commit/202d23164993bd3e62e987fd757aa8dbeb85882b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10844] [SparkR] Added correlation of tw...
Github user shivaram commented on the pull request: https://github.com/apache/spark/pull/8924#issuecomment-143812424 @NarineK As I commented on the JIRA, could we close this PR as we have https://github.com/apache/spark/pull/8869 in progress ? If there is something missing in #8869 please add your comments to that ! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Introduce config constants object
Github user jaceklaskowski closed the pull request at: https://github.com/apache/spark/pull/8753 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10836] [SparkR] Added sort(x, decreasin...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/8920#issuecomment-143810610 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org