[GitHub] spark pull request: [SPARK-7155] [CORE] Allow newAPIHadoopFile to ...
Github user yongtang commented on the pull request: https://github.com/apache/spark/pull/5708#issuecomment-96524002 @srowen Thanks for the comment. I updated the pull request so that setInputPaths instead of addInputPaths are used. In addition to newAPIHadoopFile(), the instances of addInputPath inside wholeTextFiles() and binaryFiles() have also been updated with setInputPaths. That should bring behavior consistency across all ScalaContext.scala. The unit test for this issue has also been updated to cover every method involved. Please let me know if there is anything else that needs to be taken care of. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1556: bump jets3t version to 0.9.0
Github user LuqmanSahaf commented on the pull request: https://github.com/apache/spark/pull/468#issuecomment-96522017 @darose I am facing the VerifyError you mentioned in one of the comments. Can you tell me how you solved that error? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7160][SQL] Support converting DataFrame...
GitHub user rayortigas opened a pull request: https://github.com/apache/spark/pull/5713 [SPARK-7160][SQL] Support converting DataFrames to typed RDDs. https://issues.apache.org/jira/browse/SPARK-7160 https://github.com/databricks/spark-csv/pull/52 cc: @rxin (who made the original suggestion) @vlyubin #5279 @punya #5578 @davies #5350 @marmbrus (ScalaReflection and more) You can merge this pull request into a Git repository by running: $ git pull https://github.com/rayortigas/spark df-to-typed-rdd Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5713.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5713 commit add51b6ad8f0ffe0ed600917d4339a531da07750 Author: Ray Ortigas Date: 2015-04-27T06:27:50Z [SPARK-7160][SQL] Support converting DataFrames to typed RDDs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4943][SPARK-5251][SQL] Allow table name...
Github user scwf commented on the pull request: https://github.com/apache/spark/pull/4062#issuecomment-96519116 @liancheng , rebased and updated, any comments here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1406] Mllib pmml model export
Github user selvinsource commented on the pull request: https://github.com/apache/spark/pull/3062#issuecomment-96516889 @mengxr for SVM, I manually tried what you suggested and it looks good. I loaded the example below in JPMML and evaluated it as Classification map, indeed the intercept on the NO category acts as threshold when `normalizationMethod = none`. Here the example: http://www.dmg.org/PMML-4_2";> 2015-04-27T06:58:22 However, I noticed that if the SVM model threshold is set to None, it simply displays the margin (which is how it is implemented now in the pmml exporter). My question is, should we support both? If `threshold = None`, export as regression (like it is implemented now), if `threshold <> None`, export as binary classification (as you suggested). What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL][Minor] fix java doc for DataFrame.agg
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5712#issuecomment-96516809 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6505][SQL]Remove the reflection call in...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/5660 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/4015#issuecomment-96516751 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/4015#issuecomment-96516744 I think Jenkins is having some trouble right now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6734] [SQL] Add UDTF.close support in G...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/5383#issuecomment-96516549 @liancheng @marmbrus Any more comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/4015#issuecomment-96516483 @liancheng @rxin @marmbrus can you trigger the unit test for me? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4705:[core] Write event logs of differen...
Github user twinkle-sachdeva closed the pull request at: https://github.com/apache/spark/pull/4845 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-6735:[YARN] Adding properties to disable...
Github user twinkle-sachdeva commented on the pull request: https://github.com/apache/spark/pull/5449#issuecomment-96515946 Hi @srowen , Please review the changes. Thanks, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6505][SQL]Remove the reflection call in...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/5660#issuecomment-96515766 Thanks for working on this! I'm merging this to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6865][SQL] DataFrame column names shoul...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/5505#discussion_r29122071 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala --- @@ -688,7 +688,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { sql("DROP TABLE alwaysNullable") } - test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") { + ignore("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") { --- End diff -- I guess it should be OK to disable or even remove this test now, since now we check for invalid field names explicitly and suggest users to add aliases. See #5263. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6352] [SQL] Custom parquet output commi...
Github user liancheng commented on the pull request: https://github.com/apache/spark/pull/5525#issuecomment-96514382 @ypcat Sorry for the late reply. This LGTM except for a minor issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6352] [SQL] Custom parquet output commi...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/5525#discussion_r29121990 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + +import parquet.Log +import parquet.hadoop.util.ContextUtil +import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat} + +private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) + extends ParquetOutputCommitter(outputPath, context) { + val LOG = Log.getLog(classOf[ParquetOutputCommitter]) + + override def getWorkPath(): Path = outputPath + override def abortTask(taskContext: TaskAttemptContext): Unit = {} + override def commitTask(taskContext: TaskAttemptContext): Unit = {} + override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true + override def setupJob(jobContext: JobContext): Unit = {} + override def setupTask(taskContext: TaskAttemptContext): Unit = {} + + override def commitJob(jobContext: JobContext) { +try { + val configuration = ContextUtil.getConfiguration(jobContext) + val fileSystem = outputPath.getFileSystem(configuration) + if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { +val outputStatus = fileSystem.getFileStatus(outputPath) +val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus) +try { + ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers) +} catch { + case e: Exception => { +LOG.warn("could not write summary file for " + outputPath, e) +val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE) +if (fileSystem.exists(metadataPath)) { + fileSystem.delete(metadataPath, true) +} + } +} + } + if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) { +val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) +fileSystem.create(successPath).close() + } +} catch { + case e: Exception => LOG.warn("could not write summary file for " + outputPath, e) --- End diff -- Exception caught here may be also caused by writing the `_SUCCEEDED` mark file. Can we move the outer most `try` into the first `if` block, and add a separate `try` in the second `if` block for writing the `_SUCCEEDED` mark file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/5377#issuecomment-96514035 LGTM, only minor comments. The tests look good. Apologies for taking so long to review! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121814 --- Diff: network/common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java --- @@ -86,4 +117,237 @@ public void testNonMatching() { assertFalse(server.isComplete()); } } + + @Test + public void testSaslAuthentication() throws Exception { +testBasicSasl(false); + } + + @Test + public void testSaslEncryption() throws Exception { --- End diff -- I think these methods should be right next to testBasicSasl's definition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121766 --- Diff: network/common/src/main/java/org/apache/spark/network/TransportContext.java --- @@ -99,9 +108,9 @@ public TransportServer createServer() { * be used to communicate on this channel. The TransportClient is directly associated with a * ChannelHandler to ensure all users of the same channel get the same TransportClient object. */ - public TransportChannelHandler initializePipeline(SocketChannel channel) { + public TransportChannelHandler initializePipeline(SocketChannel channel, RpcHandler rpcHandler) { --- End diff -- The rpcHandler vs appRpcHandler thing is definitely confusing to someone reading this for the first time, please add a comment here or in TransportContext about the difference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121679 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.sasl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.FileRegion; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCountUtil; + +import org.apache.spark.network.util.ByteArrayWritableChannel; +import org.apache.spark.network.util.NettyUtils; + +class SaslEncryption { --- End diff -- Please add a class comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121639 --- Diff: network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java --- @@ -58,10 +60,15 @@ public ExternalShuffleClient( TransportConf conf, SecretKeyHolder secretKeyHolder, - boolean saslEnabled) { + boolean saslEnabled, + boolean saslEncryptionEnabled) { +Preconditions.checkArgument( +!saslEncryptionEnabled || saslEnabled, --- End diff -- nit: I think 2 space indent is appropriate here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121626 --- Diff: network/common/src/test/resources/log4j.properties --- @@ -23,5 +23,5 @@ log4j.appender.file.file=target/unit-tests.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n -# Silence verbose logs from 3rd-party libraries. +# Filter debug messages from noisy 3rd-party libs. --- End diff -- kind of a funny change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121561 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java --- @@ -75,11 +81,20 @@ private final SecretKeyHolder secretKeyHolder; private SaslServer saslServer; - public SparkSaslServer(String secretKeyId, SecretKeyHolder secretKeyHolder) { + public SparkSaslServer( + String secretKeyId, + SecretKeyHolder secretKeyHolder, + boolean alwaysEncrypt) { this.secretKeyId = secretKeyId; this.secretKeyHolder = secretKeyHolder; + +String qop = alwaysEncrypt ? QOP_AUTH_CONF : String.format("%s,%s", QOP_AUTH_CONF, QOP_AUTH); --- End diff -- I assume this is a comma-separated list of the supported formats, for negotiation? Maybe add a comment to this effect. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121544 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java --- @@ -60,13 +60,19 @@ static final String DIGEST = "DIGEST-MD5"; /** - * The quality of protection is just "auth". This means that we are doing - * authentication only, we are not supporting integrity or privacy protection of the - * communication channel after authentication. This could be changed to be configurable - * in the future. + * QOP value that includes encryption. --- End diff -- I liked the spelled-out "quality of protection" better :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121527 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SparkSaslServer.java --- @@ -60,13 +60,19 @@ static final String DIGEST = "DIGEST-MD5"; /** - * The quality of protection is just "auth". This means that we are doing - * authentication only, we are not supporting integrity or privacy protection of the - * communication channel after authentication. This could be changed to be configurable - * in the future. + * QOP value that includes encryption. + */ + static final String QOP_AUTH_CONF = "auth-conf"; + + /** + * QOP value that does not include encryption. + */ + static final String QOP_AUTH = "auth"; + + /** + * Common SASL config properties for both client and server. */ static final Map SASL_PROPS = ImmutableMap.builder() -.put(Sasl.QOP, "auth") .put(Sasl.SERVER_AUTH, "true") --- End diff -- Is this property relevant for the client? Potentially we could just do away with this static map if not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7142][SQL]: Minor enhancement to Boolea...
Github user saucam commented on a diff in the pull request: https://github.com/apache/spark/pull/5700#discussion_r29121549 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -413,6 +418,10 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { case LessThan(l, r) => GreaterThanOrEqual(l, r) // not(l <= r) => l > r case LessThanOrEqual(l, r) => GreaterThan(l, r) +// not(l || r) => not(l) && not(r) +case Or(l, r) => And(Not(l), Not(r)) +// not(l && r) => not(l) or not(r) +case And(l, r) => Or(Not(l), Not(r)) --- End diff -- This is inside a case match : ``` case not @ Not(exp) => exp match { case Or(l, r) => And(Not(l), Not(r)) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on the pull request: https://github.com/apache/spark/pull/5267#issuecomment-96512684 @yu-iskw I'm still going through the patch, but so far it's looking good! I've also been testing it locally. Is there a reason you removed the `toMergeList` method from the previous version of this submission? That seemed quite useful to me, as it's a common way to describe the output of hierarchical clustering, both in formal treatments as well as in other analysis libraries (though I do suggest naming it `toLinkageMatrix`). What do you think about bringing it back? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121411 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslRpcHandler.java --- @@ -46,19 +47,30 @@ /** Class which provides secret keys which are shared by server and client on a per-app basis. */ private final SecretKeyHolder secretKeyHolder; - /** Maps each channel to its SASL authentication state. */ - private final ConcurrentMap channelAuthenticationMap; + /** The client channel. */ + private final Channel channel; - public SaslRpcHandler(RpcHandler delegate, SecretKeyHolder secretKeyHolder) { + private final TransportConf conf; + + private SparkSaslServer saslServer; + private boolean isComplete; + + SaslRpcHandler( + TransportConf conf, + Channel channel, + RpcHandler delegate, + SecretKeyHolder secretKeyHolder) { +this.conf = conf; --- End diff -- nit: reorder fields to follow same order as constructor parameters --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/5267#discussion_r29121407 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClusteringModel.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.util.{Loader, Saveable} +import org.apache.spark.rdd.RDD +import org.apache.spark.{Logging, SparkContext} + +/** + * This class is used for the model of the hierarchical clustering + * + * @param tree a cluster as a tree node + */ +class HierarchicalClusteringModel(val tree: ClusterTree) +extends Serializable with Saveable with Logging { + + /** Current version of model save/load format. */ + override protected def formatVersion: String = "1.0" + + override def save(sc: SparkContext, path: String) { +val oos = new java.io.ObjectOutputStream(new java.io.FileOutputStream(path)) +try { + oos.writeObject(this) +} finally { + oos.close() +} + } + + def getClusters(): Array[ClusterTree] = this.tree.getLeavesNodes() --- End diff -- Remove parentheses after `getClusters` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/5267#discussion_r29121409 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClusteringModel.scala --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} +import org.apache.spark.api.java.JavaRDD +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.util.{Loader, Saveable} +import org.apache.spark.rdd.RDD +import org.apache.spark.{Logging, SparkContext} + +/** + * This class is used for the model of the hierarchical clustering + * + * @param tree a cluster as a tree node + */ +class HierarchicalClusteringModel(val tree: ClusterTree) +extends Serializable with Saveable with Logging { + + /** Current version of model save/load format. */ + override protected def formatVersion: String = "1.0" + + override def save(sc: SparkContext, path: String) { +val oos = new java.io.ObjectOutputStream(new java.io.FileOutputStream(path)) +try { + oos.writeObject(this) +} finally { + oos.close() +} + } + + def getClusters(): Array[ClusterTree] = this.tree.getLeavesNodes() + + def getCenters(): Array[Vector] = this.getClusters().map(_.center) --- End diff -- Remove parentheses after `getCenters` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121332 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.sasl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.FileRegion; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCountUtil; + +import org.apache.spark.network.util.ByteArrayWritableChannel; +import org.apache.spark.network.util.NettyUtils; + +class SaslEncryption { + + @VisibleForTesting + static final String ENCRYPTION_HANDLER_NAME = "saslEncryption"; + + /** + * Adds channel handlers that perform encryption / decryption of data using SASL. + * + * @param channel The channel. + * @param backend The SASL backend. + * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control + * memory usage. + */ + static void addToChannel( + Channel channel, + SaslEncryptionBackend backend, + int maxOutboundBlockSize) { +channel.pipeline() + .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize)) + .addFirst("saslDecryption", new DecryptionHandler(backend)) + .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder()); + } + + private static class EncryptionHandler extends ChannelOutboundHandlerAdapter { + +private final int maxOutboundBlockSize; +private final SaslEncryptionBackend backend; + +EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) { + this.backend = backend; + this.maxOutboundBlockSize = maxOutboundBlockSize; +} + +/** + * Wrap the incoming message in an implementation that will perform encryption lazily. This is + * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in + * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it + * does not guarantee any ordering. + */ +@Override +public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + + ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise); +} + +@Override +public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + try { +backend.dispose(); + } finally { +super.handlerRemoved(ctx); + } +} + + } + + private static class DecryptionHandler extends MessageToMessageDecoder { + +private final SaslEncryptionBackend backend; + +DecryptionHandler(SaslEncryptionBackend backend) { + this.backend = backend; +} + +@Override +protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) + throws Exception { + + byte[] data; + int offset; + int length = msg.readableBytes(); + if (msg.hasArray()) { +data = msg.array(); +offset = msg.arrayOffset(); + } else { +data = new byte[length]; +msg.readBytes(data); +offset = 0; +
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on the pull request: https://github.com/apache/spark/pull/5267#issuecomment-96511859 @yu-iskw I'm not familiar with any other self-contained metrics (there are a bunch of metrics for relating estimated clusters to some known ground-truth clustering, but I don't think that's what you mean). Are you wanting to provide other outputs to the user to assess clustering quality? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7142][SQL]: Minor enhancement to Boolea...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/5700#discussion_r29121291 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -413,6 +418,10 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { case LessThan(l, r) => GreaterThanOrEqual(l, r) // not(l <= r) => l > r case LessThanOrEqual(l, r) => GreaterThan(l, r) +// not(l || r) => not(l) && not(r) +case Or(l, r) => And(Not(l), Not(r)) +// not(l && r) => not(l) or not(r) +case And(l, r) => Or(Not(l), Not(r)) --- End diff -- `case Not(Or(l, r))`? Seems you miss the `Not`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/5267#discussion_r29121200 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala --- @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV, norm => breezeNorm} +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.{Logging, SparkException} + +import scala.collection.{Map, mutable} + + +object HierarchicalClustering extends Logging { + + private[clustering] val ROOT_INDEX_KEY: Long = 1 + + /** + * Finds the closes cluster's center + * + * @param metric a distance metric + * @param centers centers of the clusters + * @param point a target point + * @return an index of the array of clusters + */ + private[mllib] + def findClosestCenter(metric: Function2[BV[Double], BV[Double], Double]) +(centers: Seq[BV[Double]])(point: BV[Double]): Int = { +val (closestCenter, closestIndex) = + centers.zipWithIndex.map { case (center, idx) => (metric(center, point), idx)}.minBy(_._1) +closestIndex + } +} + +/** + * This is a divisive hierarchical clustering algorithm based on bi-sect k-means algorithm. + * + * The main idea of this algorithm is based on "A comparison of document clustering techniques", + * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD, 2000. + * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf + * + * @param numClusters tne number of clusters you want + * @param clusterMap the pairs of cluster and its index as Map + * @param maxIterations the number of maximal iterations + * @param maxRetries the number of maximum retries + * @param seed a random seed + */ +class HierarchicalClustering private ( + private var numClusters: Int, + private var clusterMap: Map[Long, ClusterTree], + private var maxIterations: Int, + private var maxRetries: Int, + private var seed: Long) extends Logging { + + /** + * Constructs with the default configuration + */ + def this() = this(20, mutable.ListMap.empty[Long, ClusterTree], 20, 10, 1) + + /** + * Sets the number of clusters you want + */ + def setNumClusters(numClusters: Int): this.type = { +this.numClusters = numClusters +this + } + + def getNumClusters: Int = this.numClusters + + /** + * Sets the number of maximal iterations in each clustering step + */ + def setMaxIterations(maxIterations: Int): this.type = { +this.maxIterations = maxIterations +this + } + + def getSubIterations: Int = this.maxIterations + + /** + * Sets the number of maximum retries of each clustering step + */ + def setMaxRetries(maxRetries: Int): this.type = { +this.maxRetries = maxRetries +this + } + + def getMaxRetries: Int = this.maxRetries + + /** + * Sets the random seed + */ + def setSeed(seed: Long): this.type = { +this.seed = seed +this + } + + def getSeed: Long = this.seed + + /** + * Runs the hierarchical clustering algorithm + * @param input RDD of vectors + * @return model for the hierarchical clustering + */ + def run(input: RDD[Vector]): HierarchicalClusteringModel = { +val sc = input.sparkContext +log.info(s"${sc.appName} starts a hierarchical clustering algorithm") + +var data = initData(input).cache() --- End diff -- This algorithm contains a lot of `cacheing` and `unpersisting`. Can we add a more detailed note in the docstrings as to how much of a data
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121182 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.sasl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.FileRegion; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCountUtil; + +import org.apache.spark.network.util.ByteArrayWritableChannel; +import org.apache.spark.network.util.NettyUtils; + +class SaslEncryption { + + @VisibleForTesting + static final String ENCRYPTION_HANDLER_NAME = "saslEncryption"; + + /** + * Adds channel handlers that perform encryption / decryption of data using SASL. + * + * @param channel The channel. + * @param backend The SASL backend. + * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control + * memory usage. + */ + static void addToChannel( + Channel channel, + SaslEncryptionBackend backend, + int maxOutboundBlockSize) { +channel.pipeline() + .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize)) + .addFirst("saslDecryption", new DecryptionHandler(backend)) + .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder()); + } + + private static class EncryptionHandler extends ChannelOutboundHandlerAdapter { + +private final int maxOutboundBlockSize; +private final SaslEncryptionBackend backend; + +EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) { + this.backend = backend; + this.maxOutboundBlockSize = maxOutboundBlockSize; +} + +/** + * Wrap the incoming message in an implementation that will perform encryption lazily. This is + * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in + * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it + * does not guarantee any ordering. + */ +@Override +public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + + ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise); +} + +@Override +public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + try { +backend.dispose(); + } finally { +super.handlerRemoved(ctx); + } +} + + } + + private static class DecryptionHandler extends MessageToMessageDecoder { + +private final SaslEncryptionBackend backend; + +DecryptionHandler(SaslEncryptionBackend backend) { + this.backend = backend; +} + +@Override +protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) + throws Exception { + + byte[] data; + int offset; + int length = msg.readableBytes(); + if (msg.hasArray()) { +data = msg.array(); +offset = msg.arrayOffset(); + } else { +data = new byte[length]; +msg.readBytes(data); +offset = 0; +
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/5267#discussion_r29121153 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala --- @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV, norm => breezeNorm} +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.{Logging, SparkException} + +import scala.collection.{Map, mutable} + + +object HierarchicalClustering extends Logging { + + private[clustering] val ROOT_INDEX_KEY: Long = 1 + + /** + * Finds the closes cluster's center + * + * @param metric a distance metric + * @param centers centers of the clusters + * @param point a target point + * @return an index of the array of clusters + */ + private[mllib] + def findClosestCenter(metric: Function2[BV[Double], BV[Double], Double]) +(centers: Seq[BV[Double]])(point: BV[Double]): Int = { +val (closestCenter, closestIndex) = + centers.zipWithIndex.map { case (center, idx) => (metric(center, point), idx)}.minBy(_._1) +closestIndex + } +} + +/** + * This is a divisive hierarchical clustering algorithm based on bi-sect k-means algorithm. + * + * The main idea of this algorithm is based on "A comparison of document clustering techniques", + * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD, 2000. + * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf + * + * @param numClusters tne number of clusters you want + * @param clusterMap the pairs of cluster and its index as Map + * @param maxIterations the number of maximal iterations + * @param maxRetries the number of maximum retries + * @param seed a random seed + */ +class HierarchicalClustering private ( + private var numClusters: Int, + private var clusterMap: Map[Long, ClusterTree], + private var maxIterations: Int, + private var maxRetries: Int, + private var seed: Long) extends Logging { + + /** + * Constructs with the default configuration + */ + def this() = this(20, mutable.ListMap.empty[Long, ClusterTree], 20, 10, 1) + + /** + * Sets the number of clusters you want + */ + def setNumClusters(numClusters: Int): this.type = { +this.numClusters = numClusters +this + } + + def getNumClusters: Int = this.numClusters + + /** + * Sets the number of maximal iterations in each clustering step + */ + def setMaxIterations(maxIterations: Int): this.type = { +this.maxIterations = maxIterations +this + } + + def getSubIterations: Int = this.maxIterations + + /** + * Sets the number of maximum retries of each clustering step + */ + def setMaxRetries(maxRetries: Int): this.type = { +this.maxRetries = maxRetries +this + } + + def getMaxRetries: Int = this.maxRetries + + /** + * Sets the random seed + */ + def setSeed(seed: Long): this.type = { +this.seed = seed +this + } + + def getSeed: Long = this.seed + + /** + * Runs the hierarchical clustering algorithm + * @param input RDD of vectors + * @return model for the hierarchical clustering + */ + def run(input: RDD[Vector]): HierarchicalClusteringModel = { +val sc = input.sparkContext +log.info(s"${sc.appName} starts a hierarchical clustering algorithm") + +var data = initData(input).cache() +val startTime = System.currentTimeMillis() + +// `clusters` is described as binary tree structure +// `clusters(1)` means the root of a bina
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/5267#discussion_r29121114 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala --- @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV, norm => breezeNorm} +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.{Logging, SparkException} + +import scala.collection.{Map, mutable} + + +object HierarchicalClustering extends Logging { + + private[clustering] val ROOT_INDEX_KEY: Long = 1 + + /** + * Finds the closes cluster's center + * + * @param metric a distance metric + * @param centers centers of the clusters + * @param point a target point + * @return an index of the array of clusters + */ + private[mllib] + def findClosestCenter(metric: Function2[BV[Double], BV[Double], Double]) +(centers: Seq[BV[Double]])(point: BV[Double]): Int = { +val (closestCenter, closestIndex) = + centers.zipWithIndex.map { case (center, idx) => (metric(center, point), idx)}.minBy(_._1) +closestIndex + } +} + +/** + * This is a divisive hierarchical clustering algorithm based on bi-sect k-means algorithm. + * + * The main idea of this algorithm is based on "A comparison of document clustering techniques", + * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD, 2000. + * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf + * + * @param numClusters tne number of clusters you want + * @param clusterMap the pairs of cluster and its index as Map + * @param maxIterations the number of maximal iterations + * @param maxRetries the number of maximum retries + * @param seed a random seed + */ +class HierarchicalClustering private ( + private var numClusters: Int, + private var clusterMap: Map[Long, ClusterTree], + private var maxIterations: Int, + private var maxRetries: Int, + private var seed: Long) extends Logging { + + /** + * Constructs with the default configuration + */ + def this() = this(20, mutable.ListMap.empty[Long, ClusterTree], 20, 10, 1) + + /** + * Sets the number of clusters you want + */ + def setNumClusters(numClusters: Int): this.type = { +this.numClusters = numClusters +this + } + + def getNumClusters: Int = this.numClusters + + /** + * Sets the number of maximal iterations in each clustering step + */ + def setMaxIterations(maxIterations: Int): this.type = { +this.maxIterations = maxIterations +this + } + + def getSubIterations: Int = this.maxIterations + + /** + * Sets the number of maximum retries of each clustering step + */ + def setMaxRetries(maxRetries: Int): this.type = { +this.maxRetries = maxRetries +this + } + + def getMaxRetries: Int = this.maxRetries + + /** + * Sets the random seed + */ + def setSeed(seed: Long): this.type = { +this.seed = seed +this + } + + def getSeed: Long = this.seed + + /** + * Runs the hierarchical clustering algorithm + * @param input RDD of vectors + * @return model for the hierarchical clustering + */ + def run(input: RDD[Vector]): HierarchicalClusteringModel = { +val sc = input.sparkContext +log.info(s"${sc.appName} starts a hierarchical clustering algorithm") + +var data = initData(input).cache() +val startTime = System.currentTimeMillis() + +// `clusters` is described as binary tree structure +// `clusters(1)` means the root of a bina
[GitHub] spark pull request: [SQL][Minor] fix java doc for DataFrame.agg
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/5712 [SQL][Minor] fix java doc for DataFrame.agg You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5712.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5712 commit be23064ede05ec1ae6907b78f08f57bbf12f9da7 Author: Wenchen Fan Date: 2015-04-27T05:06:46Z fix java doc for DataFrame.agg --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6229] Add SASL encryption to network li...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/5377#discussion_r29121094 --- Diff: network/common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java --- @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.sasl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.FileRegion; +import io.netty.handler.codec.MessageToMessageDecoder; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCountUtil; + +import org.apache.spark.network.util.ByteArrayWritableChannel; +import org.apache.spark.network.util.NettyUtils; + +class SaslEncryption { + + @VisibleForTesting + static final String ENCRYPTION_HANDLER_NAME = "saslEncryption"; + + /** + * Adds channel handlers that perform encryption / decryption of data using SASL. + * + * @param channel The channel. + * @param backend The SASL backend. + * @param maxOutboundBlockSize Max size in bytes of outgoing encrypted blocks, to control + * memory usage. + */ + static void addToChannel( + Channel channel, + SaslEncryptionBackend backend, + int maxOutboundBlockSize) { +channel.pipeline() + .addFirst(ENCRYPTION_HANDLER_NAME, new EncryptionHandler(backend, maxOutboundBlockSize)) + .addFirst("saslDecryption", new DecryptionHandler(backend)) + .addFirst("saslFrameDecoder", NettyUtils.createFrameDecoder()); + } + + private static class EncryptionHandler extends ChannelOutboundHandlerAdapter { + +private final int maxOutboundBlockSize; +private final SaslEncryptionBackend backend; + +EncryptionHandler(SaslEncryptionBackend backend, int maxOutboundBlockSize) { + this.backend = backend; + this.maxOutboundBlockSize = maxOutboundBlockSize; +} + +/** + * Wrap the incoming message in an implementation that will perform encryption lazily. This is + * needed to guarantee ordering of the outgoing encrypted packets - they need to be decrypted in + * the same order, and netty doesn't have an atomic ChannelHandlerContext.write() API, so it + * does not guarantee any ordering. + */ +@Override +public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + + ctx.write(new EncryptedMessage(backend, msg, maxOutboundBlockSize), promise); +} + +@Override +public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + try { +backend.dispose(); + } finally { +super.handlerRemoved(ctx); + } +} + + } + + private static class DecryptionHandler extends MessageToMessageDecoder { + +private final SaslEncryptionBackend backend; + +DecryptionHandler(SaslEncryptionBackend backend) { + this.backend = backend; +} + +@Override +protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) + throws Exception { + + byte[] data; + int offset; + int length = msg.readableBytes(); + if (msg.hasArray()) { +data = msg.array(); +offset = msg.arrayOffset(); --- End diff -- should we advance msg by the readableBytes? --- If your project is set up for it, you can reply
[GitHub] spark pull request: [SPARK-2883][SQL] Spark Support for ORCFile fo...
Github user zhzhan commented on a diff in the pull request: https://github.com/apache/spark/pull/5275#discussion_r29121067 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + + +import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.Kryo +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.orc._ +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.{SaveMode, SQLContext, DataFrame, SchemaRDD} +import scala.reflect.runtime.universe.{TypeTag, typeTag} + +package object orc { + implicit class OrcContext(sqlContext: HiveContext) { +def orcFile(path: String) = { + val parameters = Map("path"->path) + val orcRelation = OrcRelation(path, parameters, None)(sqlContext) + sqlContext.baseRelationToDataFrame(orcRelation) +} + } + + implicit class OrcSchemaRDD(dataFrame: DataFrame) { +def saveAsOrcFile(path: String): Unit = { + val parameters = Map("path"->path) + val relation = OrcRelation(path, parameters, Some(dataFrame.schema))(dataFrame.sqlContext) + relation.insert(dataFrame, false) +} + } + + // for orc compression type, only take effect in hive 0.13.1 + val orcDefaultCompressVar = "hive.exec.orc.default.compress" + // for prediction push down in hive-0.13.1, don't enable it + var ORC_FILTER_PUSHDOWN_ENABLED = true --- End diff -- @deanchen You can disable it by spark.sql.hive.orc.ORC_FILTER_PUSHDOWN_ENABLED = false --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6517][mllib] Implement the Algorithm of...
Github user freeman-lab commented on a diff in the pull request: https://github.com/apache/spark/pull/5267#discussion_r29120850 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala --- @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => BV, norm => breezeNorm} +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom +import org.apache.spark.{Logging, SparkException} + +import scala.collection.{Map, mutable} + + +object HierarchicalClustering extends Logging { + + private[clustering] val ROOT_INDEX_KEY: Long = 1 + + /** + * Finds the closes cluster's center + * + * @param metric a distance metric + * @param centers centers of the clusters + * @param point a target point + * @return an index of the array of clusters + */ + private[mllib] + def findClosestCenter(metric: Function2[BV[Double], BV[Double], Double]) +(centers: Seq[BV[Double]])(point: BV[Double]): Int = { +val (closestCenter, closestIndex) = + centers.zipWithIndex.map { case (center, idx) => (metric(center, point), idx)}.minBy(_._1) +closestIndex + } +} + +/** + * This is a divisive hierarchical clustering algorithm based on bi-sect k-means algorithm. + * + * The main idea of this algorithm is based on "A comparison of document clustering techniques", + * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD, 2000. + * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf + * + * @param numClusters tne number of clusters you want + * @param clusterMap the pairs of cluster and its index as Map + * @param maxIterations the number of maximal iterations + * @param maxRetries the number of maximum retries + * @param seed a random seed + */ +class HierarchicalClustering private ( + private var numClusters: Int, + private var clusterMap: Map[Long, ClusterTree], + private var maxIterations: Int, + private var maxRetries: Int, + private var seed: Long) extends Logging { + + /** + * Constructs with the default configuration + */ + def this() = this(20, mutable.ListMap.empty[Long, ClusterTree], 20, 10, 1) + + /** + * Sets the number of clusters you want + */ + def setNumClusters(numClusters: Int): this.type = { +this.numClusters = numClusters +this + } + + def getNumClusters: Int = this.numClusters + + /** + * Sets the number of maximal iterations in each clustering step + */ + def setMaxIterations(maxIterations: Int): this.type = { +this.maxIterations = maxIterations +this + } + + def getSubIterations: Int = this.maxIterations --- End diff -- Why the name swap? Shouldn't this be `getMaxIterations`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7056][Streaming] Make the Write Ahead L...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/5645#discussion_r29120523 --- Diff: streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLogSegment.java --- @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util; + +/** + * This is an interface that represent the information required by any implementation of --- End diff -- represents --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7135][SQL] DataFrame expression for mon...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/5709#issuecomment-96503022 Oh I see - I guess it doesn't matter then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7056][Streaming] Make the Write Ahead L...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/5645#discussion_r29120504 --- Diff: streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +/** + * Interface representing a write ahead log (aka journal) that is used by Spark Streaming to + * save the received data (by receivers) and associated metadata to a reliable storage, so that + * they can be recovered after driver failures. See the Spark docs for more information on how + * to plug in your own custom implementation of a write ahead log. + */ +@org.apache.spark.annotation.DeveloperApi +public interface WriteAheadLog { + /** + * Write the record to the log and return the segment information that is necessary to read + * back the written record. The time is used to the index the record, such that it can be + * cleaned later. Note that the written data must be durable and readable (using the + * segment info) by the time this function returns. + */ + WriteAheadLogSegment write(ByteBuffer record, long time); + + /** + * Read a written record based on the given segment information. + */ + ByteBuffer read(WriteAheadLogSegment segment); + + /** + * Read and return an iterator of all the records that have written and not yet cleanup. --- End diff -- not yet cleaned up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7056][Streaming] Make the Write Ahead L...
Github user harishreedharan commented on a diff in the pull request: https://github.com/apache/spark/pull/5645#discussion_r29120494 --- Diff: streaming/src/main/java/org/apache/spark/streaming/util/WriteAheadLog.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.util; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +/** + * Interface representing a write ahead log (aka journal) that is used by Spark Streaming to + * save the received data (by receivers) and associated metadata to a reliable storage, so that + * they can be recovered after driver failures. See the Spark docs for more information on how + * to plug in your own custom implementation of a write ahead log. + */ +@org.apache.spark.annotation.DeveloperApi +public interface WriteAheadLog { --- End diff -- Is the idea that this would be useful for Java implementations to keep this a Java interface? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5891][ML] Add Binarizer ML Transformer
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/5699#discussion_r29120473 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.BinaryAttribute +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DoubleType, StructType} + +/** + * :: AlphaComponent :: + * Binarize a column of continuous features given a threshold. + */ +@AlphaComponent +final class Binarizer extends Transformer +with HasInputCol with HasOutputCol with HasThreshold { + + setDefault(threshold -> 0.0) + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + /** @group setParam */ + def setThreshold(value: Double): this.type = set(threshold, value) + + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { +transformSchema(dataset.schema, paramMap, logging = true) +val map = extractParamMap(paramMap) +val threshold = getThreshold +val binarizer = udf { in: Double => if (in > threshold) 1.0 else 0.0 } --- End diff -- Do we want to handle vector input? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5891][ML] Add Binarizer ML Transformer
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/5699#discussion_r29120472 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.BinaryAttribute +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DoubleType, StructType} + +/** + * :: AlphaComponent :: + * Binarize a column of continuous features given a threshold. + */ +@AlphaComponent +final class Binarizer extends Transformer +with HasInputCol with HasOutputCol with HasThreshold { + + setDefault(threshold -> 0.0) + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + /** @group setParam */ + def setThreshold(value: Double): this.type = set(threshold, value) + + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { +transformSchema(dataset.schema, paramMap, logging = true) +val map = extractParamMap(paramMap) +val threshold = getThreshold --- End diff -- `getThreshold` -> `map(threshold)`. getter only looks at the embedded map, while in this function we should check the merged param map. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7135][SQL] DataFrame expression for mon...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5709#issuecomment-96502922 (That's not always true -- somebody could've deleted an index and then the scan gets turned from index scan to sequential scan, and then record ordering changed) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5891][ML] Add Binarizer ML Transformer
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/5699#discussion_r29120471 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.BinaryAttribute +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{DoubleType, StructType} + +/** + * :: AlphaComponent :: + * Binarize a column of continuous features given a threshold. + */ +@AlphaComponent +final class Binarizer extends Transformer +with HasInputCol with HasOutputCol with HasThreshold { --- End diff -- There is a problem with `HasThreshold`. Because in the doc we said "threshold used in binary classification". Maybe we should implement `threshold" param in Binarizer and document it correctly. Also, we need to document what the output is if the input equals the threshold. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7135][SQL] DataFrame expression for mon...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/5709#issuecomment-96502816 @rxin yeah I just mean if I'm in a database and I run the same query twice, I will get the same row ID for the same record. Because of non determinism in the shuffle, that's not true here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5891][ML] Add Binarizer ML Transformer
Github user mengxr commented on the pull request: https://github.com/apache/spark/pull/5699#issuecomment-96501304 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7056][Streaming] Make the Write Ahead L...
Github user harishreedharan commented on the pull request: https://github.com/apache/spark/pull/5645#issuecomment-96500303 I am taking a look at this. So far this looks good, I will comments, if any, tomorrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-7090][MLlib] Introduce LDAOptimizer to ...
Github user hhbyyh commented on the pull request: https://github.com/apache/spark/pull/5661#issuecomment-96497611 Thanks @jkbradley. I think Optimizer is simpler and provide sufficient flexibility for now. I made some changes according to other comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7135][SQL] DataFrame expression for mon...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5709#issuecomment-96497106 @pwendell you raised a very good point about ordering of records within RDDs and DataFrames. I think we should document those more clearly in the javadoc for these. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7156][SQL] add randomSplit to DataFrame...
GitHub user kaka1992 opened a pull request: https://github.com/apache/spark/pull/5711 [SPARK-7156][SQL] add randomSplit to DataFrame. SPARK-7156 add randomSplit to DataFrame. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kaka1992/spark add_randomsplit_to_dataframe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5711.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5711 commit e65939a8b47671d9a09c51b6ab18eb525e720a61 Author: 云峤 Date: 2015-04-27T04:18:08Z SPARK-7156 add randomSplit to DataFrame. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7135][SQL] DataFrame expression for mon...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5709#issuecomment-96494526 Those could change in shuffle I guess, but I don't think this is creating more confusion. What we care about here is not the record ordering, but the output of this expression is monotonic increasing. That will always be true. This is very similar to the row id idea a lot of databases have. SQL tables also don't have ordering, unless they are sorted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL][Minor] rename DataTypeParser.apply to Da...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/5710 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7153][SQL] support long type ordinal in...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5706#issuecomment-96494403 (I think you need permission from us to do it) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7153][SQL] support long type ordinal in...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5706#issuecomment-96494399 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL][Minor] rename DataTypeParser.apply to Da...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5710#issuecomment-96494348 Merging in master. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6865][SQL] DataFrame column names shoul...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/5505#issuecomment-96494320 That one actually doesn't handle most self join cases, since very often in self joins you join on different keys. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6865][SQL] DataFrame column names shoul...
Github user cloud-fan commented on the pull request: https://github.com/apache/spark/pull/5505#issuecomment-96492437 As https://github.com/apache/spark/pull/5638 handled self join correctly, should we reopen this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL][Minor] rename DataTypeParser.apply to Da...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5710#issuecomment-96492288 [Test build #30960 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30960/consoleFull) for PR 5710 at commit [`c319977`](https://github.com/apache/spark/commit/c319977c6d6d0bb45ea84e069822de56b959d647). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/4450#issuecomment-96485873 Hey Sandy, I've now taken a pretty thorough look at this patch. There are a lot of low level comments and it would be nice if you could do a pass to bring this up to date and address them. The two major new classes seem good ChainedBuffer and PartitionedSerializedPairBuffer at a high level. The only issue on which I have some remaining concern is just the overall complexity this adds to the already fairly convoluted ExternalSorter and that the new WritablePartitionedPairCollection is a little clunky. However, I don't have concrete suggestions to improve that issue at present. Curious if you have any ideas. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6752][Streaming] Allow StreamingContext...
Github user zzcclp commented on the pull request: https://github.com/apache/spark/pull/5428#issuecomment-96484530 hi, @tdas , why this PR was be reverted? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7056][Streaming] Make the Write Ahead L...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/5645#discussion_r29118877 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala --- @@ -96,9 +99,27 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logDebug(s"Read partition data of $this from block manager, block $blockId") iterator case None => // Data not found in Block Manager, grab it from write ahead log file -val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf) -val dataRead = reader.read(partition.segment) -reader.close() +var dataRead: ByteBuffer = null +var writeAheadLog: WriteAheadLog = null +try { + val dummyDirectory = FileUtils.getTempDirectoryPath() + writeAheadLog = WriteAheadLogUtils.createLogForReceiver( +SparkEnv.get.conf, dummyDirectory, hadoopConf) --- End diff -- The log directory needs to be passed through the `WriteAheadLogUtils.createLogForXXX()`. If you want to hide it from this method, and pass it through the SparkConf, then every place where `WriteAheadLogUtils.createLogForXXX()` needs to be called, we need to add the following. ``` val walConf = SparkEnv.get.conf.clone() walConf.set("logdir", ) ``` IMO that duplicates code everywhere and uglier that this dummy dir approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6352] [SQL] Custom parquet output commi...
Github user ypcat commented on the pull request: https://github.com/apache/spark/pull/5525#issuecomment-96484427 @liancheng 1.4.0 will freeze at May 1st, does this PR has chance to get in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7056][Streaming] Make the Write Ahead L...
Github user tdas commented on the pull request: https://github.com/apache/spark/pull/5645#issuecomment-96484330 @pwendell Please take a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Spark-7090][MLlib] Introduce LDAOptimizer to ...
Github user jkbradley commented on the pull request: https://github.com/apache/spark/pull/5661#issuecomment-96482693 @hhbyyh Thanks for reminding me of the discussion in the other PR. I guess it's hard to say what's better given that I've contradicted myself now about whether to split the Optimizer and LearningState concepts. I think it's fine if you keep them both under the Optimizer concept. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4450#discussion_r29118645 --- Diff: core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io.InputStream +import java.nio.IntBuffer +import java.util.Comparator + +import org.apache.spark.SparkEnv +import org.apache.spark.serializer.{JavaSerializerInstance, SerializerInstance} +import org.apache.spark.storage.BlockObjectWriter +import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._ + +/** + * Append-only buffer of key-value pairs, each with a corresponding partition ID, that serializes + * its records upon insert and stores them as raw bytes. + * + * We use two data-structures to store the contents. The serialized records are stored in a + * ChainedBuffer that can expand gracefully as records are added. This buffer is accompanied by a + * metadata buffer that stores pointers into the data buffer as well as the partition ID of each + * record. Each entry in the metadata buffer takes up a fixed amount of space. + * + * Sorting the collection means swapping entries in the metadata buffer - the record buffer need not + * be modified at all. Storing the partition IDs in the metadata buffer means that comparisons can + * happen without following any pointers, which should minimize cache misses. + * + * Currently, only sorting by partition is supported. + * + * @param metaInitialRecords The initial number of entries in the metadata buffer. + * @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records. + * @param serializerInstance the serializer used for serializing inserted records. + */ +private[spark] class PartitionedSerializedPairBuffer[K, V]( +metaInitialRecords: Int, +kvBlockSize: Int, +serializerInstance: SerializerInstance = SparkEnv.get.serializer.newInstance) + extends WritablePartitionedPairCollection[K, V] { + + if (serializerInstance.isInstanceOf[JavaSerializerInstance]) { +throw new IllegalArgumentException("PartitionedSerializedPairBuffer does not support" + + " Java-serialized objects.") + } + + private var metaBuffer = IntBuffer.allocate(metaInitialRecords * NMETA) + + private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize) + private val kvOutputStream = new ChainedBufferOutputStream(kvBuffer) + private val kvSerializationStream = serializerInstance.serializeStream(kvOutputStream) + + def insert(partition: Int, key: K, value: V): Unit = { +if (metaBuffer.position == metaBuffer.capacity) { + growMetaBuffer() +} + +val keyStart = kvBuffer.size +if (keyStart < 0) { + throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes") +} +kvSerializationStream.writeObject[Any](key) +kvSerializationStream.flush() +val valueStart = kvBuffer.size +kvSerializationStream.writeObject[Any](value) +kvSerializationStream.flush() +val valueEnd = kvBuffer.size + +metaBuffer.put(keyStart) +metaBuffer.put(valueStart) +metaBuffer.put(valueEnd) +metaBuffer.put(partition) + } + + /** Double the size of the array because we've reached capacity */ + private def growMetaBuffer(): Unit = { +if (metaBuffer.capacity * 4 >= (1 << 30)) { + // Doubling the capacity would create an array bigger than Int.MaxValue, so don't + throw new Exception( +s"Can't grow buffer beyond ${(1 << 30) / (NMETA * 4)} elements") +} +val newMetaBuffer = IntBuffer.allocate(metaBuffer.capacity * 2) +newMetaBuffer.put(metaBuffer.array) +metaBuffer = newMetaBuffer + } + + /** Iterate through the data in a giv
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4450#discussion_r29118576 --- Diff: core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io.InputStream +import java.nio.IntBuffer +import java.util.Comparator + +import org.apache.spark.SparkEnv +import org.apache.spark.serializer.{JavaSerializerInstance, SerializerInstance} +import org.apache.spark.storage.BlockObjectWriter +import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._ + +/** + * Append-only buffer of key-value pairs, each with a corresponding partition ID, that serializes + * its records upon insert and stores them as raw bytes. + * + * We use two data-structures to store the contents. The serialized records are stored in a + * ChainedBuffer that can expand gracefully as records are added. This buffer is accompanied by a + * metadata buffer that stores pointers into the data buffer as well as the partition ID of each + * record. Each entry in the metadata buffer takes up a fixed amount of space. + * + * Sorting the collection means swapping entries in the metadata buffer - the record buffer need not + * be modified at all. Storing the partition IDs in the metadata buffer means that comparisons can + * happen without following any pointers, which should minimize cache misses. + * + * Currently, only sorting by partition is supported. + * + * @param metaInitialRecords The initial number of entries in the metadata buffer. + * @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records. + * @param serializerInstance the serializer used for serializing inserted records. + */ +private[spark] class PartitionedSerializedPairBuffer[K, V]( +metaInitialRecords: Int, +kvBlockSize: Int, +serializerInstance: SerializerInstance = SparkEnv.get.serializer.newInstance) + extends WritablePartitionedPairCollection[K, V] { + + if (serializerInstance.isInstanceOf[JavaSerializerInstance]) { +throw new IllegalArgumentException("PartitionedSerializedPairBuffer does not support" + + " Java-serialized objects.") + } + + private var metaBuffer = IntBuffer.allocate(metaInitialRecords * NMETA) + + private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize) + private val kvOutputStream = new ChainedBufferOutputStream(kvBuffer) + private val kvSerializationStream = serializerInstance.serializeStream(kvOutputStream) + + def insert(partition: Int, key: K, value: V): Unit = { +if (metaBuffer.position == metaBuffer.capacity) { + growMetaBuffer() +} + +val keyStart = kvBuffer.size +if (keyStart < 0) { + throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes") +} +kvSerializationStream.writeObject[Any](key) +kvSerializationStream.flush() +val valueStart = kvBuffer.size +kvSerializationStream.writeObject[Any](value) +kvSerializationStream.flush() +val valueEnd = kvBuffer.size + +metaBuffer.put(keyStart) +metaBuffer.put(valueStart) +metaBuffer.put(valueEnd) +metaBuffer.put(partition) + } + + /** Double the size of the array because we've reached capacity */ + private def growMetaBuffer(): Unit = { +if (metaBuffer.capacity * 4 >= (1 << 30)) { + // Doubling the capacity would create an array bigger than Int.MaxValue, so don't + throw new Exception( +s"Can't grow buffer beyond ${(1 << 30) / (NMETA * 4)} elements") +} +val newMetaBuffer = IntBuffer.allocate(metaBuffer.capacity * 2) +newMetaBuffer.put(metaBuffer.array) +metaBuffer = newMetaBuffer + } + + /** Iterate through the data in a giv
[GitHub] spark pull request: [SPARK-7120][SPARK-7121] Closure cleaner nesti...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/5685#issuecomment-96479602 Weird, tests pass locally. Investigating. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4450#discussion_r29118405 --- Diff: core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io.InputStream +import java.nio.IntBuffer +import java.util.Comparator + +import org.apache.spark.SparkEnv +import org.apache.spark.serializer.{JavaSerializerInstance, SerializerInstance} +import org.apache.spark.storage.BlockObjectWriter +import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._ + +/** + * Append-only buffer of key-value pairs, each with a corresponding partition ID, that serializes + * its records upon insert and stores them as raw bytes. + * + * We use two data-structures to store the contents. The serialized records are stored in a + * ChainedBuffer that can expand gracefully as records are added. This buffer is accompanied by a + * metadata buffer that stores pointers into the data buffer as well as the partition ID of each + * record. Each entry in the metadata buffer takes up a fixed amount of space. + * + * Sorting the collection means swapping entries in the metadata buffer - the record buffer need not + * be modified at all. Storing the partition IDs in the metadata buffer means that comparisons can + * happen without following any pointers, which should minimize cache misses. + * + * Currently, only sorting by partition is supported. + * + * @param metaInitialRecords The initial number of entries in the metadata buffer. + * @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records. + * @param serializerInstance the serializer used for serializing inserted records. + */ +private[spark] class PartitionedSerializedPairBuffer[K, V]( +metaInitialRecords: Int, +kvBlockSize: Int, +serializerInstance: SerializerInstance = SparkEnv.get.serializer.newInstance) + extends WritablePartitionedPairCollection[K, V] { + + if (serializerInstance.isInstanceOf[JavaSerializerInstance]) { +throw new IllegalArgumentException("PartitionedSerializedPairBuffer does not support" + + " Java-serialized objects.") + } + + private var metaBuffer = IntBuffer.allocate(metaInitialRecords * NMETA) + + private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize) + private val kvOutputStream = new ChainedBufferOutputStream(kvBuffer) + private val kvSerializationStream = serializerInstance.serializeStream(kvOutputStream) + + def insert(partition: Int, key: K, value: V): Unit = { +if (metaBuffer.position == metaBuffer.capacity) { + growMetaBuffer() +} + +val keyStart = kvBuffer.size +if (keyStart < 0) { + throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes") +} +kvSerializationStream.writeObject[Any](key) +kvSerializationStream.flush() +val valueStart = kvBuffer.size +kvSerializationStream.writeObject[Any](value) +kvSerializationStream.flush() +val valueEnd = kvBuffer.size + +metaBuffer.put(keyStart) +metaBuffer.put(valueStart) +metaBuffer.put(valueEnd) +metaBuffer.put(partition) + } + + /** Double the size of the array because we've reached capacity */ + private def growMetaBuffer(): Unit = { +if (metaBuffer.capacity * 4 >= (1 << 30)) { --- End diff -- Any reason not to just check for `metaBuffer.capacity.toLong > Integer.MAX_VALUE` - seems easier to reason about. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4450#discussion_r29118412 --- Diff: core/src/main/scala/org/apache/spark/util/collection/PartitionedSerializedPairBuffer.scala --- @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io.InputStream +import java.nio.IntBuffer +import java.util.Comparator + +import org.apache.spark.SparkEnv +import org.apache.spark.serializer.{JavaSerializerInstance, SerializerInstance} +import org.apache.spark.storage.BlockObjectWriter +import org.apache.spark.util.collection.PartitionedSerializedPairBuffer._ + +/** + * Append-only buffer of key-value pairs, each with a corresponding partition ID, that serializes + * its records upon insert and stores them as raw bytes. + * + * We use two data-structures to store the contents. The serialized records are stored in a + * ChainedBuffer that can expand gracefully as records are added. This buffer is accompanied by a + * metadata buffer that stores pointers into the data buffer as well as the partition ID of each + * record. Each entry in the metadata buffer takes up a fixed amount of space. + * + * Sorting the collection means swapping entries in the metadata buffer - the record buffer need not + * be modified at all. Storing the partition IDs in the metadata buffer means that comparisons can + * happen without following any pointers, which should minimize cache misses. + * + * Currently, only sorting by partition is supported. + * + * @param metaInitialRecords The initial number of entries in the metadata buffer. + * @param kvBlockSize The size of each byte buffer in the ChainedBuffer used to store the records. + * @param serializerInstance the serializer used for serializing inserted records. + */ +private[spark] class PartitionedSerializedPairBuffer[K, V]( +metaInitialRecords: Int, +kvBlockSize: Int, +serializerInstance: SerializerInstance = SparkEnv.get.serializer.newInstance) + extends WritablePartitionedPairCollection[K, V] { + + if (serializerInstance.isInstanceOf[JavaSerializerInstance]) { +throw new IllegalArgumentException("PartitionedSerializedPairBuffer does not support" + + " Java-serialized objects.") + } + + private var metaBuffer = IntBuffer.allocate(metaInitialRecords * NMETA) + + private val kvBuffer: ChainedBuffer = new ChainedBuffer(kvBlockSize) + private val kvOutputStream = new ChainedBufferOutputStream(kvBuffer) + private val kvSerializationStream = serializerInstance.serializeStream(kvOutputStream) + + def insert(partition: Int, key: K, value: V): Unit = { +if (metaBuffer.position == metaBuffer.capacity) { + growMetaBuffer() +} + +val keyStart = kvBuffer.size +if (keyStart < 0) { + throw new Exception(s"Can't grow buffer beyond ${1 << 31} bytes") +} +kvSerializationStream.writeObject[Any](key) +kvSerializationStream.flush() +val valueStart = kvBuffer.size +kvSerializationStream.writeObject[Any](value) +kvSerializationStream.flush() +val valueEnd = kvBuffer.size + +metaBuffer.put(keyStart) +metaBuffer.put(valueStart) +metaBuffer.put(valueEnd) +metaBuffer.put(partition) + } + + /** Double the size of the array because we've reached capacity */ + private def growMetaBuffer(): Unit = { +if (metaBuffer.capacity * 4 >= (1 << 30)) { + // Doubling the capacity would create an array bigger than Int.MaxValue, so don't + throw new Exception( +s"Can't grow buffer beyond ${(1 << 30) / (NMETA * 4)} elements") --- End diff -- Maybe `Integer.MAX_VALUE` would be better here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have thi
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4450#discussion_r29118289 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -740,15 +723,29 @@ private[spark] class ExternalSorter[K, V, C]( in.close() } } +} else if (spills.isEmpty && partitionWriters == null) { --- End diff -- The branching here is starting to get very complicated (#1799 added a second level of branching, and now this adds a third). Also, it a bit redundant with the branching in `partitionedIterator`, which also has its own special case for this. However, I don't seen an obvious way to improve it given the design of these other data structures. I'll keep thinking --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7153][SQL] support long type ordinal in...
Github user cloud-fan commented on the pull request: https://github.com/apache/spark/pull/5706#issuecomment-96478322 Hi @rxin , how can I trigger the jenkins to run test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-6954. [YARN] ExecutorAllocationManager c...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5704#issuecomment-96478176 [Test build #713 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/713/consoleFull) for PR 5704 at commit [`9eea5fe`](https://github.com/apache/spark/commit/9eea5fe47c26b0488478e45b9d3d4d183a6f72e6). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/4015#issuecomment-96478138 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4450#discussion_r29118071 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -740,15 +723,29 @@ private[spark] class ExternalSorter[K, V, C]( in.close() } } +} else if (spills.isEmpty && partitionWriters == null) { + // Case where we only have in-memory data + val collection = if (aggregator.isDefined) map else buffer + val it = collection.destructiveSortedWritablePartitionedIterator(comparator) + while (it.hasNext) { +val writer = blockManager.getDiskWriter( + blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) +val partitionId = it.nextPartition() +while (it.hasNext && it.nextPartition() == partitionId) { + it.writeNext(writer) +} +writer.commitAndClose() +val segment = writer.fileSegment() +lengths(partitionId) = segment.length + } } else { - // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by - // partition and just write everything directly. + // Not bypassing merge-sort; get an iterator by partition and just write everything directly. --- End diff -- Is the following true: If at least one spill has occurred from the partitioned buffer, the data that is in the present buffer will be de-serialized and then re-serialized as it's written to disk? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2750][WIP]Add Https support for Web UI
Github user WangTaoTheTonic commented on the pull request: https://github.com/apache/spark/pull/5664#issuecomment-96476093 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7086][Deploy]Do not retry when public s...
Github user WangTaoTheTonic closed the pull request at: https://github.com/apache/spark/pull/5657 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7086][Deploy]Do not retry when public s...
Github user WangTaoTheTonic commented on the pull request: https://github.com/apache/spark/pull/5657#issuecomment-96471737 After taking a look at https://github.com/apache/spark/pull/3314 and discussion with @scwf offline, we both think the "speifty port range for each" idea is better for issue [SPARK-7086](https://issues.apache.org/jira/browse/SPARK-7086) and [SPARK-4449](https://issues.apache.org/jira/browse/SPARK-4449). So I will close this and keep track at https://github.com/apache/spark/pull/3314. @srowen Thanks for your comments and nice idea. :smiley: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL][Minor] rename DataTypeParser.apply to Da...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5710#issuecomment-96469552 [Test build #30960 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/30960/consoleFull) for PR 5710 at commit [`c319977`](https://github.com/apache/spark/commit/c319977c6d6d0bb45ea84e069822de56b959d647). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL][Minor] rename DataTypeParser.apply to Da...
Github user scwf commented on a diff in the pull request: https://github.com/apache/spark/pull/5710#discussion_r29117834 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -92,7 +92,7 @@ object PhysicalOperation extends PredicateHelper { } def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = fields.collect { -case a @ Alias(child, _) => a.toAttribute.asInstanceOf[Attribute] -> child +case a @ Alias(child, _) => a.toAttribute -> child --- End diff -- this `asInstanceOf` is not necessary, also delete it here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SQL][Minor] rename DataTypeParser.apply to Da...
GitHub user scwf opened a pull request: https://github.com/apache/spark/pull/5710 [SQL][Minor] rename DataTypeParser.apply to DataTypeParser.parse rename DataTypeParser.apply to DataTypeParser.parse to make it more clear and readable. /cc @rxin You can merge this pull request into a Git repository by running: $ git pull https://github.com/scwf/spark apply Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/5710.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5710 commit c319977c6d6d0bb45ea84e069822de56b959d647 Author: wangfei Date: 2015-04-27T01:16:02Z rename apply to parse --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4550. In sort-based shuffle, store map o...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/4450#discussion_r29117710 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ChainedBuffer.scala --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import java.io.OutputStream + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.storage.BlockObjectWriter + +/** + * A logical byte buffer that wraps a list of byte arrays. All the byte arrays have equal size. The + * advantage of this over a standard ArrayBuffer is that it can grow without claiming large amounts + * of memory and needing to copy the full contents. + */ +private[spark] class ChainedBuffer(chunkSize: Int) { --- End diff -- After looking at this closer, this is somewhat similar to that, except that you support the random lookups. Their both simple enough I'm not sure they need to be merged - so it might be fine to just add a TODO saying we should look into merging them later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2883][SQL] Spark Support for ORCFile fo...
Github user deanchen commented on a diff in the pull request: https://github.com/apache/spark/pull/5275#discussion_r29117639 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + + +import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.Kryo +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.orc._ +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.{SaveMode, SQLContext, DataFrame, SchemaRDD} +import scala.reflect.runtime.universe.{TypeTag, typeTag} + +package object orc { + implicit class OrcContext(sqlContext: HiveContext) { +def orcFile(path: String) = { + val parameters = Map("path"->path) + val orcRelation = OrcRelation(path, parameters, None)(sqlContext) + sqlContext.baseRelationToDataFrame(orcRelation) +} + } + + implicit class OrcSchemaRDD(dataFrame: DataFrame) { +def saveAsOrcFile(path: String): Unit = { + val parameters = Map("path"->path) + val relation = OrcRelation(path, parameters, Some(dataFrame.schema))(dataFrame.sqlContext) + relation.insert(dataFrame, false) +} + } + + // for orc compression type, only take effect in hive 0.13.1 + val orcDefaultCompressVar = "hive.exec.orc.default.compress" + // for prediction push down in hive-0.13.1, don't enable it + var ORC_FILTER_PUSHDOWN_ENABLED = true --- End diff -- How can this variable be disabled? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6924][YARN] Fix driver hangs in yarn-cl...
Github user SaintBacchus commented on the pull request: https://github.com/apache/spark/pull/5663#issuecomment-96465455 @andrewor14 Can you also have a look at this problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7140][MLLIB] only scan the first 16 ent...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5697#issuecomment-96464039 [Test build #712 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/712/consoleFull) for PR 5697 at commit [`2abc86d`](https://github.com/apache/spark/commit/2abc86dddb1c6e0bdbb8cb2129d9e26b7424a03b). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. * This patch does not change any dependencies. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7135][SQL] DataFrame expression for mon...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/5709#issuecomment-96463353 No, but the ordering of records in a partition can change, so you might have different identifiers for the same record across retries (unless this is only used for already sorted data... is it?). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6747][SQL] Support List<> as a return t...
Github user maropu commented on the pull request: https://github.com/apache/spark/pull/5395#issuecomment-96462276 cc @marmbrus Could you merge into master? I'll make a PR of SPARK-6912, but it depends on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6304][Streaming] Fix checkpointing does...
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/5060#issuecomment-96460443 Hi @tdas , I agree with you that only port in enough, no need to track the host. But we could only save the port when user explicitly set it. Port number 0 will not be stayed after `SparkEnv` is initialized, as you see [here](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkEnv.scala#L250), so we shouldn't save this port number (which is set by `SparkEnv`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6304][Streaming] Fix checkpointing does...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/5060#discussion_r29116979 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -94,6 +94,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // contains a map from hostname to a list of input format splits on the host. private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() + // This is used for Spark Streaming to check whether driver host and port are set by user, + // if these two configurations are set by user, so the recovery mechanism should not remove this. + private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host") --- End diff -- I'm not sure how to track this in `Checkpoint`, since `SparkEnv` will reset this configuration if user not set it, so in the `Checkpoint` how to differentiate whether this is set by user or `SparkEnv`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7086][Deploy]Do not retry when public s...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/5657#issuecomment-96457581 Ah, we don't have this change committed yet: https://github.com/apache/spark/pull/3314 (Or, a variant on this.) The right-er way to fix this is to be able to express a range of ports, which might only include 1 port, in which case there would be no more retries anyway. I suggesting focusing on resolving SPARK-4449 as a way to fix this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7086][Deploy]Do not retry when public s...
Github user WangTaoTheTonic commented on the pull request: https://github.com/apache/spark/pull/5657#issuecomment-96455354 If retry, then master will use another port. We can see it from `Utils.scala`: >for (offset <- 0 to maxRetries) { ... ((startPort + offset - 1024) % (65536 - 1024)) + 1024 >logWarning(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6304][Streaming] Fix checkpointing does...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/5060#discussion_r29116759 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -94,6 +94,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // contains a map from hostname to a list of input format splits on the host. private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() + // This is used for Spark Streaming to check whether driver host and port are set by user, + // if these two configurations are set by user, so the recovery mechanism should not remove this. + private[spark] val isDriverHostSetByUser = config.contains("spark.driver.host") --- End diff -- But I think there has to be a place in Spark Core to judge whether this configuration is set by user or Spark itself before SparkContext is initialized, either in SparkConf or somewhere else. It cannot be gotten from Spark Streaming, where all the SparkContext things have already been initialized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-5213] [SQL] Pluggable SQL Parser Suppor...
Github user chenghao-intel commented on the pull request: https://github.com/apache/spark/pull/4015#issuecomment-96455163 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-6738] [CORE] Improve estimate the size ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5608#issuecomment-96455128 [Test build #715 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/715/consoleFull) for PR 5608 at commit [`5506bae`](https://github.com/apache/spark/commit/5506baed18705f26fbd59aff95db567d2008aba3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-7103: Fix crash with SparkContext.union ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/5679#issuecomment-96454919 [Test build #714 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/714/consoleFull) for PR 5679 at commit [`5a3d846`](https://github.com/apache/spark/commit/5a3d84649b46df9fd670e951941e809e1e6d98a7). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org