Re: Similar Items

2016-09-19 Thread Nick Pentreath
How many products do you have? How large are your vectors?

It could be that SVD / LSA could be helpful. But if you have many products
then trying to compute all-pair similarity with brute force is not going to
be scalable. In this case you may want to investigate hashing (LSH)
techniques.


On Mon, 19 Sep 2016 at 22:49, Kevin Mellott 
wrote:

> Hi all,
>
> I'm trying to write a Spark application that will detect similar items (in
> this case products) based on their descriptions. I've got an ML pipeline
> that transforms the product data to TF-IDF representation, using the
> following components.
>
>- *RegexTokenizer* - strips out non-word characters, results in a list
>of tokens
>- *StopWordsRemover* - removes common "stop words", such as "the",
>"and", etc.
>- *HashingTF* - assigns a numeric "hash" to each token and calculates
>the term frequency
>- *IDF* - computes the inverse document frequency
>
> After this pipeline evaluates, I'm left with a SparseVector that
> represents the inverse document frequency of tokens for each product. As a
> next step, I'd like to be able to compare each vector to one another, to
> detect similarities.
>
> Does anybody know of a straightforward way to do this in Spark? I tried
> creating a UDF (that used the Breeze linear algebra methods internally);
> however, that did not scale well.
>
> Thanks,
> Kevin
>


Task Deserialization Error

2016-09-19 Thread Chawla,Sumit
Hi All

I am trying to test a simple Spark APP using scala.


import org.apache.spark.SparkContext

object SparkDemo {
  def main(args: Array[String]) {
val logFile = "README.md" // Should be some file on your system

// to run in local mode
val sc = new SparkContext("local", "Simple App",
""PATH_OF_DIRECTORY_WHERE_COMPILED_SPARK_PROJECT_FROM_GIT")

val logData = sc.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()


println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

  }
}


When running this demo in IntelliJ, i am getting following error:


java.lang.IllegalStateException: unread block data
at 
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2449)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1385)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


I guess its associated with task not being deserializable.  Any help
will be appreciated.



Regards
Sumit Chawla


cassandra and spark can be built and worked on the same computer?

2016-09-19 Thread muhammet pakyürek


can we connect to cassandra from spark using spark-cassandra-connector which 
all three are built on the same computer? what kind of problems this 
configuration leads to?


Configuring Kinesis max records limit in KinesisReceiver

2016-09-19 Thread Aravindh
I use `KinesisUtil.createStream` to create a DStream from a kinesis stream.
By default my spark receiver receives 1 events from the stream. I see
that the default value for KCL comes from  KCL Configuration

 
. How do I override this? I dont see a way to pass configuration to 
`KinesisUtil.createStream` for overriding the KCL configuration.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Kinesis-max-records-limit-in-KinesisReceiver-tp27763.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



is there any bug for the configuration of spark 2.0 cassandra spark connector 2.0 and cassandra 3.0.8

2016-09-19 Thread muhammet pakyürek


please tell me the configuration including the most recent version of 
cassandra, spark and cassandra spark connector


How to know WHO are the slaves for an application

2016-09-19 Thread Xiaoye Sun
Hi all,

I am currently making some changes in Spark in my research project.

In my development, after an application has been submitted to the spark
master, the master needs to get the IP addresses of all the slaves used by
that application, so that the spark master is able to talk to the
slave machines through a proposed mechanism. I am wondering which
class/object in spark master has such information and will it be a
different case when the cluster is managed by a standalone scheduler, Yarn
and Mesos.

I saw something related to this question in the master's log in standalone
mode as follows. However, in function executorAdded in Class
SparkDeploySchedulerBackend. it just prints a log without adding the slave
to anything.
I am using spark 1.6.1.

16/09/12 11:34:41.262 INFO AppClient$ClientEndpoint: Connecting to master
spark://192.168.50.105:7077...
16/09/12 11:34:41.283 DEBUG TransportClientFactory: Creating new connection
to /192.168.50.105:7077
16/09/12 11:34:41.302 DEBUG ResourceLeakDetector:
-Dio.netty.leakDetectionLevel: simple
16/09/12 11:34:41.307 DEBUG TransportClientFactory: Connection to /
192.168.50.105:7077 successful, running bootstraps...
16/09/12 11:34:41.307 DEBUG TransportClientFactory: Successfully created
connection to /192.168.50.105:7077 after 23 ms (0 ms spent in bootstraps)
16/09/12 11:34:41.334 DEBUG Recycler: -Dio.netty.recycler.maxCapacity.default:
262144
16/09/12 11:34:41.458 INFO SparkDeploySchedulerBackend: Connected to Spark
cluster with app ID app-20160912113441-
16/09/12 11:34:41.459 DEBUG BlockManager: BlockManager initialize is called
16/09/12 11:34:41.463 DEBUG TransportServer: Shuffle server started on port
:35874
16/09/12 11:34:41.463 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 35874.
16/09/12 11:34:41.464 INFO NettyBlockTransferService: Server created on
35874
16/09/12 11:34:41.465 INFO BlockManagerMaster: Trying to register
BlockManager
16/09/12 11:34:41.468 INFO BlockManagerMasterEndpoint: Registering block
manager 192.168.50.105:35874 with 3.8 GB RAM, BlockManagerId(driver,
192.168.50.105, 35874)
16/09/12 11:34:41.470 INFO BlockManagerMaster: Registered BlockManager
*16/09/12 11:34:41.486 INFO AppClient$ClientEndpoint: Executor added:
app-20160912113441-/0 on worker-20160912113428-192.168.50.106-59927
(192.168.50.106:59927 ) with 1 cores*
*16/09/12 11:34:41.486 INFO SparkDeploySchedulerBackend: Granted executor
ID app-20160912113441-/0 on hostPort 192.168.50.106:59927
 with 1 cores, 6.0 GB RAM*
*16/09/12 11:34:41.487 INFO AppClient$ClientEndpoint: Executor added:
app-20160912113441-/1 on worker-20160912113428-192.168.50.106-59927
(192.168.50.106:59927 ) with 1 cores*
*16/09/12 11:34:41.487 INFO SparkDeploySchedulerBackend: Granted executor
ID app-20160912113441-/1 on hostPort 192.168.50.106:59927
 with 1 cores, 6.0 GB RAM*
*16/09/12 11:34:41.488 INFO AppClient$ClientEndpoint: Executor added:
app-20160912113441-/2 on worker-20160912113405-192.168.50.108-35454
(192.168.50.108:35454 ) with 1 cores*
*16/09/12 11:34:41.489 INFO SparkDeploySchedulerBackend: Granted executor
ID app-20160912113441-/2 on hostPort 192.168.50.108:35454
 with 1 cores, 6.0 GB RAM*

Thanks!

Best,
Xiaoye


Re: Can I assign affinity for spark executor processes?

2016-09-19 Thread Xiaoye Sun
Hi Jakob,

Yes. you are right. I should use taskset when I start the *.sh scripts.

For more detail, I change the last line in ./sbin/start-slaves.sh on master
to this
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; *"taskset" "0xffe"*
"${SPARK_HOME}/sbin/start-slave.sh"
"spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT", where 0xffe is the affinity
mask.

Thanks!

Best,
Xiaoye

On Tue, Sep 13, 2016 at 11:01 PM, Jakob Odersky  wrote:

> Hi Xiaoye,
> could it be that the executors were spawned before the affinity was
> set on the worker? Would it help to start spark worker with taskset
> from the beginning, i.e. "taskset [mask] start-slave.sh"?
> Workers in spark (standalone mode) simply create processes with the
> standard java process API. Unless there is something funky going on in
> the JRE, I don't see how spark could affect cpu affinity.
>
> regards,
> --Jakob
>
> On Tue, Sep 13, 2016 at 7:56 PM, Xiaoye Sun  wrote:
> > Hi,
> >
> > In my experiment, I pin one very important process on a fixed CPU. So the
> > performance of Spark task execution will be affected if the executors or
> the
> > worker uses that CPU. I am wondering if it is possible to let the Spark
> > executors not using a particular CPU.
> >
> > I tried to 'taskset -p [cpumask] [pid]' command to set the affinity of
> the
> > Worker process. However, the executor processes created by the worker
> > process don't inherit the same CPU affinity.
> >
> > Thanks!
> >
> > Best,
> > Xiaoye
>


write.df is failing on Spark Cluster

2016-09-19 Thread sankarmittapally
 We have setup a spark cluster which is on NFS shared storage, there is no
permission issues with NFS storage, all the users are able to write to NFS
storage. When I fired write.df command in SparkR, I am getting below. Can
some one please help me to fix this issue.


16/09/17 08:03:28 ERROR InsertIntoHadoopFsRelationCommand: Aborting job.
java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus
{path=file:/nfspartition/sankar/banking_l1_v2.csv/_temporary/0/task_201609170802_0013_m_00/part-r-0-46a7f178-2490-444e-9110-510978eaaecb.csv;
isDirectory=false; length=436486316; replication=1; blocksize=33554432;
modification_time=147409940; access_time=0; owner=; group=;
permission=rw-rw-rw-; isSymlink=false}
to
file:/nfspartition/sankar/banking_l1_v2.csv/part-r-0-46a7f178-2490-444e-9110-510978eaaecb.csv
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:371)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:384)
at
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:326)
at
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:141)
at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:86)
at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEvent

SPARK-10835 in 2.0

2016-09-19 Thread janardhan shetty
Hi,

I am hitting this issue. https://issues.apache.org/jira/browse/SPARK-10835.

Issue seems to be resolved but resurfacing in 2.0 ML. Any workaround is
appreciated ?

Note:
Pipeline has Ngram before word2Vec.

Error:
val word2Vec = new
Word2Vec().setInputCol("wordsGrams").setOutputCol("features").setVectorSize(128).setMinCount(10)

scala> word2Vec.fit(grams)
java.lang.IllegalArgumentException: requirement failed: Column wordsGrams
must be of type ArrayType(StringType,true) but was actually
ArrayType(StringType,false).
  at scala.Predef$.require(Predef.scala:224)
  at
org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
  at
org.apache.spark.ml.feature.Word2VecBase$class.validateAndTransformSchema(Word2Vec.scala:111)
  at
org.apache.spark.ml.feature.Word2Vec.validateAndTransformSchema(Word2Vec.scala:121)
  at
org.apache.spark.ml.feature.Word2Vec.transformSchema(Word2Vec.scala:187)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:70)
  at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:170)


Github code for Ngram:


override protected def validateInputType(inputType: DataType): Unit = {
require(inputType.sameType(ArrayType(StringType)),
  s"Input type must be ArrayType(StringType) but got $inputType.")
  }

  override protected def outputDataType: DataType = new
ArrayType(StringType, false)
}


LDA and Maximum Iterations

2016-09-19 Thread Frank Zhang
Hi all,
   I have a question about parameter setting for LDA model. When I tried to set 
a large number like 500 for  setMaxIterations, the program always fails.  There 
is a very straightforward LDA tutorial using an example data set in the mllib 
package:http://stackoverflow.com/questions/36631991/latent-dirichlet-allocation-lda-algorithm-not-printing-results-in-spark-scala.
  The codes are here:
import org.apache.spark.mllib.clustering.LDAimport 
org.apache.spark.mllib.linalg.Vectors// Load and parse the dataval data = 
sc.textFile("/data/mllib/sample_lda_data.txt") // you might need to change the 
path for the data setval parsedData = data.map(s => 
Vectors.dense(s.trim.split(' ').map(_.toDouble)))// Index documents with unique 
IDsval corpus = parsedData.zipWithIndex.map(_.swap).cache()// Cluster the 
documents into three topics using LDAval ldaModel = new 
LDA().setK(3).run(corpus)

But if I change the last line to 
val ldaModel = new LDA().setK(3).setMaxIterations(500).run(corpus), the program 
fails.  

    I greatly appreciate your help! 
Best,
    Frank




   

Re: Is RankingMetrics' NDCG implementation correct?

2016-09-19 Thread Jong Wook Kim
Thanks for the clarification and the relevant links. I overlooked the
comments explicitly saying that the relevance is binary.

I understand that the label is not a relevance, but I have been, and I
think many people are using the label as relevance in the implicit feedback
context where the user-provided exact label is not defined anyway. I think
that's why RiVal 's using the term
"preference" for both the label for MAE and the relevance for NDCG.

At the same time, I see why Spark decided to assume the relevance is
binary, in part to conform to the class RankingMetrics's constructor. I
think it would be nice if the upcoming DataFrame-based RankingEvaluator can
be optionally set a "relevance column" that has non-binary relevance
values, otherwise defaulting to either 1.0 or the label column.

My extended version of RankingMetrics is here:
https://github.com/jongwook/spark-ranking-metrics . It has a test case
checking that the numbers are same as RiVal's.

Jong Wook



On 19 September 2016 at 03:13, Sean Owen  wrote:

> Yes, relevance is always 1. The label is not a relevance score so
> don't think it's valid to use it as such.
>
> On Mon, Sep 19, 2016 at 4:42 AM, Jong Wook Kim  wrote:
> > Hi,
> >
> > I'm trying to evaluate a recommendation model, and found that Spark and
> > Rival give different results, and it seems that Rival's one is what
> Kaggle
> > defines: https://gist.github.com/jongwook/5d4e78290eaef22cb69abbf68b52e5
> 97
> >
> > Am I using RankingMetrics in a wrong way, or is Spark's implementation
> > incorrect?
> >
> > To my knowledge, NDCG should be dependent on the relevance (or
> preference)
> > values, but Spark's implementation seems not; it uses 1.0 where it
> should be
> > 2^(relevance) - 1, probably assuming that relevance is all 1.0? I also
> tried
> > tweaking, but its method to obtain the ideal DCG also seems wrong.
> >
> > Any feedback from MLlib developers would be appreciated. I made a
> > modified/extended version of RankingMetrics that produces the identical
> > numbers to Kaggle and Rival's results, and I'm wondering if it is
> something
> > appropriate to be added back to MLlib.
> >
> > Jong Wook
>


it does not stop at the breakpoint line within an anonymous function concerning RDD

2016-09-19 Thread chen yong
Hello ALL,

I am new to spark. I use IDEA ver 14.0.3 to debug spark recently.It is strange 
to me that any breakpoint set within an anonymous function concerning RDD,such 
as breakpoint-1 in the below code snippet, is invalid(a red X appears on the 
left of the line, mouse hovering message showing that "no executable code found 
at line xx in class apache.spark.example.sparkpi$"). The breakpoint-1 will be 
skipped., it stops at breakpoint-2 directly. However, the ultimate result PI 
value is correct.

I am running a "local" run/debug configuration

I have stumbled by this problem for a long time.Please help .Your help will be 
appreciated very much!!!

package org.apache.spark.examples
import scala.math.random
import org.apache.spark._
import scala.util.logging.Logged
 
/** Computes an approximation to pi */
object SparkPi{
  def main(args: Array[String]) {
 
val conf = new SparkConf().setAppName("Spark Pi").setMaster("local")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(10L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1  (breakpoint-1)
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
  }.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1)) (breakpoint-2)
spark.stop()
  }
}

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



it does not stop at the breakpoint line within an anonymous function concerning RDD

2016-09-19 Thread chen yong
Hello ALL,

I am new to spark. I use IDEA ver 14.0.3 to debug spark recently.It is strange 
to me that any breakpoint set within an anonymous function concerning RDD,such 
as breakpoint-1 in the below code snippet, is invalid(a red X appears on the 
left of the line, mouse hovering message showing that "no executable code found 
at line xx in class apache.spark.example.sparkpi$"). The breakpoint-1 will be 
skipped., it stops at breakpoint-2 directly. However, the ultimate result PI 
value is correct.

I am running a "local" run/debug configuration

I have stumbled by this problem for a long time.Please help .Your help will be 
appreciated very much!!!

package org.apache.spark.examples
import scala.math.random
import org.apache.spark._
import scala.util.logging.Logged
 
/** Computes an approximation to pi */
object SparkPi{
  def main(args: Array[String]) {
 
val conf = new SparkConf().setAppName("Spark Pi").setMaster("local")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(10L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1  (breakpoint-1)
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
  }.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / (n - 1)) (breakpoint-2)
spark.stop()
  }
}

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: as.Date can't be applied to Spark data frame in SparkR

2016-09-19 Thread xingye
Update:
the job can finish, but takes a long time on a 10M row data. is there a better 
solution?
From: xing_ma...@hotmail.com
To: user@spark.apache.org
Subject: as.Date can't be applied to Spark data frame in SparkR
Date: Tue, 20 Sep 2016 10:22:17 +0800




Hi, all
I've noticed that as.Date can't be applied to Spark data frame. I've created 
the following UDF and used dapply to change a integer column "aa"  to a date 
with origin as 1960-01-01.
change_date<-function(df){   df<-as.POSIXlt(as.Date(df$aa, origin = 
"1960-01-01", tz = "UTC"))   } customSchema<- structType(structField("rc", 
"integer"),      structField("change_date(x)","timestamp"))
rollup_1_t <- dapply(rollup_1, function(x) { x <- 
cbind(x,change_date(x))},schema=customSchema)
It works with a small dataset but it takes forever to finish on a big dataset. 
It does not give a result when I used 'head(rollup_1_t).
 I guess it is because for "change_date" function, it converts the spark data 
frame back to R data frame, which is slow and would potentially fail. Is there 
a better solution?
Thanks,Ye   
  

as.Date can't be applied to Spark data frame in SparkR

2016-09-19 Thread xingye
Hi, all
I've noticed that as.Date can't be applied to Spark data frame. I've created 
the following UDF and used dapply to change a integer column "aa"  to a date 
with origin as 1960-01-01.
change_date<-function(df){   df<-as.POSIXlt(as.Date(df$aa, origin = 
"1960-01-01", tz = "UTC"))   } customSchema<- structType(structField("rc", 
"integer"),      structField("change_date(x)","timestamp"))
rollup_1_t <- dapply(rollup_1, function(x) { x <- 
cbind(x,change_date(x))},schema=customSchema)
It works with a small dataset but it takes forever to finish on a big dataset. 
It does not give a result when I used 'head(rollup_1_t).
 I guess it is because for "change_date" function, it converts the spark data 
frame back to R data frame, which is slow and would potentially fail. Is there 
a better solution?
Thanks,Ye 

Re: Is there such thing as cache fusion with the underlying tables/files on HDFS

2016-09-19 Thread Gene Pang
Hi Mich,

While Alluxio is not a database (it exposes a file system interface), you
can use Alluxio to keep certain data in memory. With Alluxio, you can
selectively pin data in memory (http://www.alluxio.org/docs/
master/en/Command-Line-Interface.html#pin). There are also ways to control
how to read and write the data in Alluxio memory (
http://www.alluxio.org/docs/master/en/File-System-API.html). These options
and features can help you control how you access your data.

Thanks,
Gene

On Sat, Sep 17, 2016 at 9:53 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> I am seeing similar issues when I was working on Oracle with Tableau as
> the dashboard.
>
> Currently I have a batch layer that gets streaming data from
>
> source -> Kafka -> Flume -> HDFS
>
> It stored on HDFS as text files and a cron process sinks Hive table with
> the the external table build on the directory. I tried both ORC and Parquet
> but I don't think the query itself is the issue.
>
> Meaning it does not matter how clever your execution engine is, the fact
> you still have to do  considerable amount of Physical IO (PIO) as opposed
> to Logical IO (LIO) to get the data to Zeppelin is on the critical path.
>
> One option is to limit the amount of data in Zeppelin to certain number of
> rows or something similar. However, you cannot tell a user he/she cannot
> see the full data.
>
> We resolved this with Oracle by using Oracle TimesTen
> IMDB
> to cache certain tables in memory and get them refreshed (depending on
> refresh frequency) from the underlying table in Oracle when data is
> updated). That is done through cache fusion.
>
> I was looking around and came across Alluxio .
> Ideally I like to utilise such concept like TimesTen. Can one distribute
> Hive table data (or any table data) across the nodes cached. In that case
> we will be doing Logical IO which is about 20 times or more lightweight
> compared to Physical IO.
>
> Anyway this is the concept.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Kinesis Receiver not respecting spark.streaming.receiver.maxRate

2016-09-19 Thread Aravindh
Hi Sai, I am running in local mode and there is only one receiver. Verified
that.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kinesis-Receiver-not-respecting-spark-streaming-receiver-maxRate-tp27754p27760.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Sending extraJavaOptions for Spark 1.6.1 on mesos 0.28.2 in cluster mode

2016-09-19 Thread sagarcasual .
Hello,
I have my Spark application running in cluster mode in CDH with
extraJavaOptions.
However when I am attempting a same application to run with apache mesos,
it does not recognize the properties below at all and code returns null
that reads them.

--conf spark.driver.extraJavaOptions=-Dsome.url=http://some-url \
--conf spark.executor.extraJavaOptions=-Dsome.url=http://some-url

I tried option specified in
http://stackoverflow.com/questions/35872093/missing-java-system-properties-when-running-spark-streaming-on-mesos-cluster?noredirect=1&lq=1

and still got no change in the result.

Any idea ho to achieve this in mesos.

-Regards
Sagar


Re: feasibility of ignite and alluxio for interfacing MPI and Spark

2016-09-19 Thread Calvin Jia
Hi,

Alluxio allows for data sharing between applications through a File System
API (Native Java Alluxio client, Hadoop FileSystem, or POSIX through fuse).
If your MPI applications can use any of these interfaces, you should be
able to use Alluxio for data sharing out of the box.

In terms of duplicating in-memory data, you should only need one copy in
Alluxio if you are able to stream your dataset. As for the performance of
using Alluxio to back your data compared to using Spark's native in-memory
representation, here is a blog
 which
details the pros and cons of the two approaches. At a high level, Alluxio
performs better with larger datasets or if you plan to use your dataset in
more than one Spark job.

Hope this helps,
Calvin


Re: NumberFormatException: For input string: "0.00000"

2016-09-19 Thread Hyukjin Kwon
It seems not an issue in Spark. Does "CSVParser" works fine without Spark
with the data?

BTW, it seems there is something wrong with your email address. I am
sending this again.

On 20 Sep 2016 8:32 a.m., "Hyukjin Kwon"  wrote:

> It seems not an issue in Spark. Does "CSVParser" works fine without Spark
> with the data?
>
> On 20 Sep 2016 2:15 a.m., "Mohamed ismail" 
> wrote:
>
>> Hi all
>>
>> I am trying to read:
>>
>> sc.textFile(DataFile).mapPartitions(lines => {
>> val parser = new CSVParser(",")
>> lines.map(line=>parseLineToTuple(line, parser))
>> })
>> Data looks like:
>> android phone,0,0,0,,0,0,0,0,0,0,0,5,0,0,0,5,0,0.0,0.0,0.000
>> 00,0.0,0.0,0,0,0,0,0,0,0,0.0,0,0,0
>> ios phone,0,-1,0,,0,0,0,0,0,0,1,0,0,0,0,1,0,0.0,0.0,0.00
>> 000,0.0,0.0,0,0,0,0,0,0,0,0.0,0,0,0
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 1 in stage 23055.0 failed 4 times, most recent failure: Lost task 1.3 in
>> stage 23055.0 (TID 191607, ):
>> java.lang.NumberFormatException: For input string: "0.0"
>>
>> Has anyone faced such issues. Is there a solution?
>>
>> Thanks,
>> Mohamed
>>
>>


Re: NumberFormatException: For input string: "0.00000"

2016-09-19 Thread Hyukjin Kwon
It seems not an issue in Spark. Does "CSVParser" works fine without Spark
with the data?

On 20 Sep 2016 2:15 a.m., "Mohamed ismail" 
wrote:

> Hi all
>
> I am trying to read:
>
> sc.textFile(DataFile).mapPartitions(lines => {
> val parser = new CSVParser(",")
> lines.map(line=>parseLineToTuple(line, parser))
> })
> Data looks like:
> android phone,0,0,0,,0,0,0,0,0,0,0,5,0,0,0,5,0,0.0,0.0,0.
> 0,0.0,0.0,0,0,0,0,0,0,0,0.0,0,0,0
> ios phone,0,-1,0,,0,0,0,0,0,0,1,0,0,0,0,1,0,0.0,0.0,0.
> 0,0.0,0.0,0,0,0,0,0,0,0,0.0,0,0,0
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 23055.0 failed 4 times, most recent failure: Lost task 1.3 in
> stage 23055.0 (TID 191607, ):
> java.lang.NumberFormatException: For input string: "0.0"
>
> Has anyone faced such issues. Is there a solution?
>
> Thanks,
> Mohamed
>
>


Re: driver OOM - need recommended memory for driver

2016-09-19 Thread Anand Viswanathan
Thank you so much Mich,

I am using yarn as my master.

I found a statement in Spark mentioning the amount of memory depends on 
individual application.
http://spark.apache.org/docs/1.5.2/hardware-provisioning.html#memory 


I guess my assumption that "default resources (memory and cores) can handle any 
application" is wrong.

Thanks and regards,
Anand Viswanathan

> On Sep 19, 2016, at 6:56 PM, Mich Talebzadeh  
> wrote:
> 
> If you make your driver memory too low it is likely you are going to hit OOM 
> error.
> 
> You have not mentioned with Spark mode you are using (Local, Standalone, Yarn 
> etc)
> 
> HTH
> 
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
>  
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> On 19 September 2016 at 23:48, Anand Viswanathan 
> mailto:anand_v...@ymail.com.invalid>> wrote:
> Thank you so much, Kevin.
> 
> My data size is around 4GB.
> I am not using collect(), take() or takeSample()
> At the final job, number of tasks grows up to 200,000
> 
> Still the driver crashes with OOM with default —driver-memory 1g but Job 
> succeeds if i specify 2g.
> 
> Thanks and regards,
> Anand Viswanathan
> 
>> On Sep 19, 2016, at 4:00 PM, Kevin Mellott > > wrote:
>> 
>> Hi Anand,
>> 
>> Unfortunately, there is not really a "one size fits all" answer to this 
>> question; however, here are some things that you may want to consider when 
>> trying different sizes.
>> What is the size of the data you are processing?
>> Whenever you invoke an action that requires ALL of the data to be sent to 
>> the driver (such as collect), you'll need to ensure that your memory setting 
>> can handle it.
>> What level of parallelization does your code support? The more processing 
>> you can do on the worker nodes, the less your driver will need to do.
>> Related to these comments, keep in mind that the --executor-memory, 
>> --num-executors, and --executor-cores configurations can be useful when 
>> tuning the worker nodes. There is some great information in the Spark Tuning 
>> Guide (linked below) that you may find useful as well.
>> 
>> http://spark.apache.org/docs/latest/tuning.html 
>> 
>> 
>> Hope that helps!
>> Kevin
>> 
>> On Mon, Sep 19, 2016 at 9:32 AM, Anand Viswanathan 
>> mailto:anand_v...@ymail.com.invalid>> wrote:
>> Hi,
>> 
>> Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark. 
>> 
>> I am running a machine learning program, which runs perfectly by specifying 
>> 2G for —driver-memory.
>> However the program cannot be run with default 1G, driver crashes with OOM 
>> error.
>> 
>> What is the recommended configuration for —driver-memory…? Please suggest.
>> 
>> Thanks and regards,
>> Anand.
>> 
>> 
> 
> 



Re: driver OOM - need recommended memory for driver

2016-09-19 Thread Mich Talebzadeh
If you make your driver memory too low it is likely you are going to hit
OOM error.

You have not mentioned with Spark mode you are using (Local, Standalone,
Yarn etc)

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 19 September 2016 at 23:48, Anand Viswanathan <
anand_v...@ymail.com.invalid> wrote:

> Thank you so much, Kevin.
>
> My data size is around 4GB.
> I am not using collect(), take() or takeSample()
> At the final job, number of tasks grows up to 200,000
>
> Still the driver crashes with OOM with default —driver-memory 1g but Job
> succeeds if i specify 2g.
>
> Thanks and regards,
> Anand Viswanathan
>
> On Sep 19, 2016, at 4:00 PM, Kevin Mellott 
> wrote:
>
> Hi Anand,
>
> Unfortunately, there is not really a "one size fits all" answer to this
> question; however, here are some things that you may want to consider when
> trying different sizes.
>
>- What is the size of the data you are processing?
>- Whenever you invoke an action that requires ALL of the data to be
>sent to the driver (such as collect), you'll need to ensure that your
>memory setting can handle it.
>- What level of parallelization does your code support? The more
>processing you can do on the worker nodes, the less your driver will need
>to do.
>
> Related to these comments, keep in mind that the --executor-memory,
> --num-executors, and --executor-cores configurations can be useful when
> tuning the worker nodes. There is some great information in the Spark
> Tuning Guide (linked below) that you may find useful as well.
>
> http://spark.apache.org/docs/latest/tuning.html
>
> Hope that helps!
> Kevin
>
> On Mon, Sep 19, 2016 at 9:32 AM, Anand Viswanathan <
> anand_v...@ymail.com.invalid> wrote:
>
>> Hi,
>>
>> Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark.
>>
>> I am running a machine learning program, which runs perfectly by
>> specifying 2G for —driver-memory.
>> However the program cannot be run with default 1G, driver crashes with
>> OOM error.
>>
>> What is the recommended configuration for —driver-memory…? Please suggest.
>>
>> Thanks and regards,
>> Anand.
>>
>>
>
>


Re: driver OOM - need recommended memory for driver

2016-09-19 Thread Anand Viswanathan
Thank you so much, Kevin.

My data size is around 4GB.
I am not using collect(), take() or takeSample()
At the final job, number of tasks grows up to 200,000

Still the driver crashes with OOM with default —driver-memory 1g but Job 
succeeds if i specify 2g.

Thanks and regards,
Anand Viswanathan

> On Sep 19, 2016, at 4:00 PM, Kevin Mellott  wrote:
> 
> Hi Anand,
> 
> Unfortunately, there is not really a "one size fits all" answer to this 
> question; however, here are some things that you may want to consider when 
> trying different sizes.
> What is the size of the data you are processing?
> Whenever you invoke an action that requires ALL of the data to be sent to the 
> driver (such as collect), you'll need to ensure that your memory setting can 
> handle it.
> What level of parallelization does your code support? The more processing you 
> can do on the worker nodes, the less your driver will need to do.
> Related to these comments, keep in mind that the --executor-memory, 
> --num-executors, and --executor-cores configurations can be useful when 
> tuning the worker nodes. There is some great information in the Spark Tuning 
> Guide (linked below) that you may find useful as well.
> 
> http://spark.apache.org/docs/latest/tuning.html 
> 
> 
> Hope that helps!
> Kevin
> 
> On Mon, Sep 19, 2016 at 9:32 AM, Anand Viswanathan 
> mailto:anand_v...@ymail.com.invalid>> wrote:
> Hi,
> 
> Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark. 
> 
> I am running a machine learning program, which runs perfectly by specifying 
> 2G for —driver-memory.
> However the program cannot be run with default 1G, driver crashes with OOM 
> error.
> 
> What is the recommended configuration for —driver-memory…? Please suggest.
> 
> Thanks and regards,
> Anand.
> 
> 



Re: off heap to alluxio/tachyon in Spark 2

2016-09-19 Thread Bin Fan
Hi,

If you are looking for how to run Spark on Alluxio (formerly Tachyon),
here is the documentation from Alluxio doc site:
http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html
It still works for Spark 2.x.

Alluxio team also published articles on when and why running Spark (2.x)
with Alluxio may benefit performance:
http://www.alluxio.com/2016/08/effective-spark-rdds-with-alluxio/

- Bin


On Mon, Sep 19, 2016 at 7:56 AM, aka.fe2s  wrote:

> Hi folks,
>
> What has happened with Tachyon / Alluxio in Spark 2? Doc doesn't mention
> it no longer.
>
> --
> Oleksiy Dyagilev
>


Anyone used Zoomdata visual dashboard with Spark

2016-09-19 Thread Mich Talebzadeh
Hi,

Zoomdata   is known to be a good tool for
real time dashboard. I am trying to have a look. Anyone has experienced
with it with Spark by any chance?

https://demo.zoomdata.com/zoomdata/login

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-19 Thread janardhan shetty
Yes Sujit I have tried that option as well.
Also tried sbt assembly but hitting below issue:

http://stackoverflow.com/questions/35197120/java-outofmemoryerror-on-sbt-
assembly

Just wondering if there any clean approach to include StanfordCoreNLP
classes in spark ML ?


On Mon, Sep 19, 2016 at 1:41 PM, Sujit Pal  wrote:

> Hi Janardhan,
>
> You need the classifier "models" attribute on the second entry for
> stanford-corenlp to indicate that you want the models JAR, as shown below.
> Right now you are importing two instances of stanford-corenlp JARs.
>
> libraryDependencies ++= {
>   val sparkVersion = "2.0.0"
>   Seq(
> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "com.google.protobuf" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>   )
> }
>
> -sujit
>
>
> On Sun, Sep 18, 2016 at 5:12 PM, janardhan shetty 
> wrote:
>
>> Hi Sujit,
>>
>> Tried that option but same error:
>>
>> java version "1.8.0_51"
>>
>>
>> libraryDependencies ++= {
>>   val sparkVersion = "2.0.0"
>>   Seq(
>> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>   )
>> }
>>
>> Error:
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> edu/stanford/nlp/pipeline/StanfordCoreNLP
>> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.ap
>> ply(Lemmatizer.scala:37)
>> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.ap
>> ply(Lemmatizer.scala:33)
>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
>> 2.apply(ScalaUDF.scala:88)
>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
>> 2.apply(ScalaUDF.scala:87)
>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(Scal
>> aUDF.scala:1060)
>> at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedEx
>> pressions.scala:142)
>> at org.apache.spark.sql.catalyst.expressions.InterpretedProject
>> ion.apply(Projection.scala:45)
>> at org.apache.spark.sql.catalyst.expressions.InterpretedProject
>> ion.apply(Projection.scala:29)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:234)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:234)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at scala.collection.TraversableLike$class.map(TraversableLike.
>> scala:234)
>>
>>
>>
>> On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal 
>> wrote:
>>
>>> Hi Janardhan,
>>>
>>> Maybe try removing the string "test" from this line in your build.sbt?
>>> IIRC, this restricts the models JAR to be called from a test.
>>>
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
>>> classifier "models",
>>>
>>> -sujit
>>>
>>>
>>> On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty <
>>> janardhan...@gmail.com> wrote:
>>>
 Hi,

 I am trying to use lemmatization as a transformer and added belwo to
 the build.sbt

  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
 "com.google.protobuf" % "protobuf-java" % "2.6.1",
 "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
 classifier "models",
 "org.scalatest" %% "scalatest" % "2.2.6" % "test"


 Error:
 *Exception in thread "main" java.lang.NoClassDefFoundError:
 edu/stanford/nlp/pipeline/StanfordCoreNLP*

 I have tried other versions of this spark package.

 Any help is appreciated..

>>>
>>>
>>
>


Re: very high maxresults setting (no collect())

2016-09-19 Thread Michael Gummelt
When you say "started seeing", do you mean after a Spark version upgrade?
After running a new job?

On Mon, Sep 19, 2016 at 2:05 PM, Adrian Bridgett 
wrote:

> Hi,
>
> We've recently started seeing a huge increase in
> spark.driver.maxResultSize - we are starting to set it at 3GB (and increase
> our driver memory a lot to 12GB or so).  This is on v1.6.1 with Mesos
> scheduler.
>
> All the docs I can see is that this is to do with .collect() being called
> on a large RDD (which isn't the case AFAIK - certainly nothing in the code)
> and it's rather puzzling me as to what's going on.  I thought that the
> number of tasks was coming into it (about 14000 tasks in each of about a
> dozen stages).  Adding a coalesce seemed to help but now we are hitting the
> problem again after a few minor code tweaks.
>
> What else could be contributing to this?   Thoughts I've had:
> - number of tasks
> - metrics?
> - um, a bit stuck!
>
> The code looks like this:
> df=
> df.persist()
> val rows = df.count()
>
> // actually we loop over this a few times
> val output = df. groupBy("id").agg(
>   avg($"score").as("avg_score"),
>   count($"id").as("rows")
> ).
> select(
>   $"id",
>   $"avg_score,
>   $"rows",
> ).sort($"id")
> output.coalesce(1000).write.format("com.databricks.spark.csv
> ").save('/tmp/...')
>
> Cheers for any help/pointers!  There are a couple of memory leak tickets
> fixed in v1.6.2 that may affect the driver so I may try an upgrade (the
> executors are fine).
>
> Adrian
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


very high maxresults setting (no collect())

2016-09-19 Thread Adrian Bridgett

Hi,

We've recently started seeing a huge increase in 
spark.driver.maxResultSize - we are starting to set it at 3GB (and 
increase our driver memory a lot to 12GB or so).  This is on v1.6.1 with 
Mesos scheduler.


All the docs I can see is that this is to do with .collect() being 
called on a large RDD (which isn't the case AFAIK - certainly nothing in 
the code) and it's rather puzzling me as to what's going on.  I thought 
that the number of tasks was coming into it (about 14000 tasks in each 
of about a dozen stages).  Adding a coalesce seemed to help but now we 
are hitting the problem again after a few minor code tweaks.


What else could be contributing to this?   Thoughts I've had:
- number of tasks
- metrics?
- um, a bit stuck!

The code looks like this:
df=
df.persist()
val rows = df.count()

// actually we loop over this a few times
val output = df. groupBy("id").agg(
  avg($"score").as("avg_score"),
  count($"id").as("rows")
).
select(
  $"id",
  $"avg_score,
  $"rows",
).sort($"id")
output.coalesce(1000).write.format("com.databricks.spark.csv").save('/tmp/...')

Cheers for any help/pointers!  There are a couple of memory leak tickets 
fixed in v1.6.2 that may affect the driver so I may try an upgrade (the 
executors are fine).


Adrian

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Similar Items

2016-09-19 Thread Kevin Mellott
Hi all,

I'm trying to write a Spark application that will detect similar items (in
this case products) based on their descriptions. I've got an ML pipeline
that transforms the product data to TF-IDF representation, using the
following components.

   - *RegexTokenizer* - strips out non-word characters, results in a list
   of tokens
   - *StopWordsRemover* - removes common "stop words", such as "the",
   "and", etc.
   - *HashingTF* - assigns a numeric "hash" to each token and calculates
   the term frequency
   - *IDF* - computes the inverse document frequency

After this pipeline evaluates, I'm left with a SparseVector that represents
the inverse document frequency of tokens for each product. As a next step,
I'd like to be able to compare each vector to one another, to detect
similarities.

Does anybody know of a straightforward way to do this in Spark? I tried
creating a UDF (that used the Breeze linear algebra methods internally);
however, that did not scale well.

Thanks,
Kevin


Spark.1.6.1 on Apache Mesos : Log4j2 could not find a logging implementation

2016-09-19 Thread sagarcasual .
Hello,
I am trying to run Spark.1.6.1 on Apache Mesos
I have log4j-core and log4j-api 2.6.2 as part of my uber jar still I am
getting following error while starting my spark app.
ERROR StatusLogger Log4j2 could not find a logging implementation. Please
add log4j-core to the classpath. Using SimpleLogger to log to the console...

Any idea what could be the issue?
-Regards
Sagar


Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-19 Thread Sujit Pal
Hi Janardhan,

You need the classifier "models" attribute on the second entry for
stanford-corenlp to indicate that you want the models JAR, as shown below.
Right now you are importing two instances of stanford-corenlp JARs.

libraryDependencies ++= {
  val sparkVersion = "2.0.0"
  Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
"org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"com.google.protobuf" % "protobuf-java" % "2.6.1",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models",
"org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )
}

-sujit


On Sun, Sep 18, 2016 at 5:12 PM, janardhan shetty 
wrote:

> Hi Sujit,
>
> Tried that option but same error:
>
> java version "1.8.0_51"
>
>
> libraryDependencies ++= {
>   val sparkVersion = "2.0.0"
>   Seq(
> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "com.google.protobuf" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>   )
> }
>
> Error:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP
> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.
> apply(Lemmatizer.scala:37)
> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.
> apply(Lemmatizer.scala:33)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
> 2.apply(ScalaUDF.scala:88)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
> 2.apply(ScalaUDF.scala:87)
> at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(
> ScalaUDF.scala:1060)
> at org.apache.spark.sql.catalyst.expressions.Alias.eval(
> namedExpressions.scala:142)
> at org.apache.spark.sql.catalyst.expressions.
> InterpretedProjection.apply(Projection.scala:45)
> at org.apache.spark.sql.catalyst.expressions.
> InterpretedProjection.apply(Projection.scala:29)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(
> TraversableLike.scala:234)
>
>
>
> On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal  wrote:
>
>> Hi Janardhan,
>>
>> Maybe try removing the string "test" from this line in your build.sbt?
>> IIRC, this restricts the models JAR to be called from a test.
>>
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
>> "models",
>>
>> -sujit
>>
>>
>> On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty <
>> janardhan...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am trying to use lemmatization as a transformer and added belwo to the
>>> build.sbt
>>>
>>>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
>>> classifier "models",
>>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>>
>>>
>>> Error:
>>> *Exception in thread "main" java.lang.NoClassDefFoundError:
>>> edu/stanford/nlp/pipeline/StanfordCoreNLP*
>>>
>>> I have tried other versions of this spark package.
>>>
>>> Any help is appreciated..
>>>
>>
>>
>


Re: Spark Job not failing

2016-09-19 Thread Mich Talebzadeh
I am not sure a commit or roll-back by RDBMS is acknowledged by Spark.
Hence it does not know what is going on. From my recollection this is an
issue.

Other alternative is to save it as a csv file and load it into RDBMS
using a form of bulk copy.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 19 September 2016 at 21:00, sai ganesh  wrote:

> yes.
>
>
> Regards,
> Sai
>
> On Mon, Sep 19, 2016 at 12:29 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> As I understanding you are inserting into RDBMS from Spark and the insert
>> is failing on RDBMS due to duplicate primary key but not acknowledged by
>> Spark? Is this correct
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 19 September 2016 at 20:19, tosaigan...@gmail.com <
>> tosaigan...@gmail.com> wrote:
>>
>>>
>>> Hi ,
>>>
>>> I have primary key on sql table iam trying to insert Dataframe into table
>>> using insertIntoJDBC.
>>>
>>> I could see failure instances in logs but still spark job is getting
>>> successful. Do you know  how can we handle in code to make it fail?
>>>
>>>
>>>
>>> 16/09/19 18:52:51 INFO TaskSetManager: Starting task 0.99 in stage 82.0
>>> (TID
>>> 5032, 10.0.0.24, partition 0,PROCESS_LOCAL, 11300 bytes)
>>> 16/09/19 18:52:52 INFO TaskSetManager: Lost task 0.99 in stage 82.0 (TID
>>> 5032) on executor 10.0.0.24: java.sql.BatchUpdateException (Violation of
>>> PRIMARY KEY constraint 'pk_unique'. Cannot insert duplicate key in object
>>> 'table_name'. The duplicate key value is (2016-09-13 04:00, 2016-09-13
>>> 04:15, 5816324).) [duplicate 99]
>>> 16/09/19 18:52:52 ERROR TaskSetManager: Task 0 in stage 82.0 failed 100
>>> times; aborting job
>>> 16/09/19 18:52:52 INFO YarnClusterScheduler: Removed TaskSet 82.0, whose
>>> tasks have all completed, from pool
>>> 16/09/19 18:52:52 INFO YarnClusterScheduler: Cancelling stage 82
>>> 16/09/19 18:52:52 INFO DAGScheduler: ResultStage 82 (insertIntoJDBC at
>>> sparkjob.scala:143) failed in 9.440 s
>>> 16/09/19 18:52:52 INFO DAGScheduler: Job 19 failed: insertIntoJDBC at
>>> sparkjob.scala:143, took 9.449118 s
>>> 16/09/19 18:52:52 INFO ApplicationMaster: Final app status: SUCCEEDED,
>>> exitCode: 0
>>> 16/09/19 18:52:52 INFO SparkContext: Invoking stop() from shutdown hook
>>>
>>>
>>> Regards,
>>> Sai
>>>
>>>
>>>
>>> -
>>> Sai Ganesh
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Spark-Job-not-failing-tp27756.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: study materials for operators on Dataframe

2016-09-19 Thread Kevin Mellott
I would recommend signing up for a Databricks Community Edition account. It
will give you access to a 6GB cluster, with many different example programs
that you can use to get started.

https://databricks.com/try-databricks

If you are looking for a more formal training method, I just completed the
EDX course linked below. The lecture videos were provided by UC-Berkeley
professors, and the labs are all run on Databricks. The classes are no
longer active (so no professor interactions); however, you can still access
all of the lectures and labs for free.

https://courses.edx.org/dashboard/programs/21/data-science-and-engineering-with-spark


PS: I am not in any way associated with Databricks, I just happen to find
their product extremely useful (especially for training purposes).

On Sun, Sep 18, 2016 at 9:41 PM, 颜发才(Yan Facai)  wrote:

> Hi,
> I am a newbie,
> and the official document of spark is too concise for me, especially the
> introduction of operators on dataframe.
>
> For python, pandas gives a very detailed document: [Pandas](
> http://pandas.pydata.org/pandas-docs/stable/index.html)
> so,
> does anyone know some sites or cookbooks which are more helpful for newbie?
>
> Thanks.
>


Re: Kinesis Receiver not respecting spark.streaming.receiver.maxRate

2016-09-19 Thread tosaigan...@gmail.com
Hi Aravindh,

spark.streaming.receiver.maxRate  is per receiver. You should multiply with
number of receivers with max rate.


Regards,
Sai

On Mon, Sep 19, 2016 at 9:31 AM, Aravindh [via Apache Spark User List] <
ml-node+s1001560n27754...@n3.nabble.com> wrote:

> I am trying to throttle my spark cluster reading events from kinesis by
> setting spark.streaming.receiver.maxRate to 100. The batch interval is 1
> second. But spark seems to read thousands of events for every second. Am I
> missing anything here?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kinesis-
> Receiver-not-respecting-spark-streaming-receiver-maxRate-tp27754.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




-
Sai Ganesh
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kinesis-Receiver-not-respecting-spark-streaming-receiver-maxRate-tp27754p27758.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Java Compatibity Problems when we install rJava

2016-09-19 Thread Arif,Mubaraka
we are running Juypter in the yarn-client mode for  pyspark (python spark). 
And, we wanted to know if anybody faced such issues while installing rJava on 
Jupyter notebook.

Also, reaching out for support with cloudera.

thanks,
Muby


From: Sean Owen [so...@cloudera.com]
Sent: Monday, September 19, 2016 2:31 PM
To: Arif,Mubaraka
Cc: User; Toivola,Sami
Subject: Re: Java Compatibity Problems when we install rJava

This isn't a Spark question, so I don't think this is the right place.

It shows that compilation of rJava failed for lack of some other
shared libraries (not Java-related). I think you'd have to get those
packages installed locally too.

If it ends up being Anaconda specific, you should try Continuum, or if
it looks CDH-related head to Cloudera support.

On Mon, Sep 19, 2016 at 8:29 PM, Arif,Mubaraka  wrote:
> We are trying to install rJava on suse Linux running Cloudera Hadoop CDH
> 5.7.2  with Spark 1.6.
>
> Anaconda 4.0 was installed using the CDH parcel.
>
>
>
> Have setup for Jupyter notebook but there are Java compability problems.
>
>
>
> For Java we are running :
>
>
>
> java version "1.8.0_51"
> Java(TM) SE Runtime Environment (build 1.8.0_51-tdc1-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)
>
>
>
> We followed the instructions from the blog :
> https://urldefense.proofpoint.com/v2/url?u=http-3A__www.ibm.com_support_knowledgecenter_SSPT3X-5F3.0.0_com.ibm.swg.im.infosphere.biginsights.install.doc_doc_install-5Finstall-5Fr.html&d=CwIBaQ&c=RI9dKKMRNVHr9NFa7OQiQw&r=dUN85GiSQZVDs0gTK4x1mSiAdXTZ-7F0KzGt2fcse38&m=Lz3LdXM3lDkt_CskgKDNAs2kYGJuhgmloph8m-sND-M&s=J58OWa6hGmo90uWwGXKHLUxIbOGOe5yoHQM0tQOSCX8&e=
>
>
>
> After running we get the output : [as attached in file - rJava_err.txt]
>
>
>
> Any help is greatly appreciated.
>
>
>
> thanks,
>
> Muby
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Job not failing

2016-09-19 Thread sai ganesh
yes.


Regards,
Sai

On Mon, Sep 19, 2016 at 12:29 PM, Mich Talebzadeh  wrote:

> As I understanding you are inserting into RDBMS from Spark and the insert
> is failing on RDBMS due to duplicate primary key but not acknowledged by
> Spark? Is this correct
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 September 2016 at 20:19, tosaigan...@gmail.com <
> tosaigan...@gmail.com> wrote:
>
>>
>> Hi ,
>>
>> I have primary key on sql table iam trying to insert Dataframe into table
>> using insertIntoJDBC.
>>
>> I could see failure instances in logs but still spark job is getting
>> successful. Do you know  how can we handle in code to make it fail?
>>
>>
>>
>> 16/09/19 18:52:51 INFO TaskSetManager: Starting task 0.99 in stage 82.0
>> (TID
>> 5032, 10.0.0.24, partition 0,PROCESS_LOCAL, 11300 bytes)
>> 16/09/19 18:52:52 INFO TaskSetManager: Lost task 0.99 in stage 82.0 (TID
>> 5032) on executor 10.0.0.24: java.sql.BatchUpdateException (Violation of
>> PRIMARY KEY constraint 'pk_unique'. Cannot insert duplicate key in object
>> 'table_name'. The duplicate key value is (2016-09-13 04:00, 2016-09-13
>> 04:15, 5816324).) [duplicate 99]
>> 16/09/19 18:52:52 ERROR TaskSetManager: Task 0 in stage 82.0 failed 100
>> times; aborting job
>> 16/09/19 18:52:52 INFO YarnClusterScheduler: Removed TaskSet 82.0, whose
>> tasks have all completed, from pool
>> 16/09/19 18:52:52 INFO YarnClusterScheduler: Cancelling stage 82
>> 16/09/19 18:52:52 INFO DAGScheduler: ResultStage 82 (insertIntoJDBC at
>> sparkjob.scala:143) failed in 9.440 s
>> 16/09/19 18:52:52 INFO DAGScheduler: Job 19 failed: insertIntoJDBC at
>> sparkjob.scala:143, took 9.449118 s
>> 16/09/19 18:52:52 INFO ApplicationMaster: Final app status: SUCCEEDED,
>> exitCode: 0
>> 16/09/19 18:52:52 INFO SparkContext: Invoking stop() from shutdown hook
>>
>>
>> Regards,
>> Sai
>>
>>
>>
>> -
>> Sai Ganesh
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-Job-not-failing-tp27756.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: driver OOM - need recommended memory for driver

2016-09-19 Thread Kevin Mellott
Hi Anand,

Unfortunately, there is not really a "one size fits all" answer to this
question; however, here are some things that you may want to consider when
trying different sizes.

   - What is the size of the data you are processing?
   - Whenever you invoke an action that requires ALL of the data to be sent
   to the driver (such as collect), you'll need to ensure that your memory
   setting can handle it.
   - What level of parallelization does your code support? The more
   processing you can do on the worker nodes, the less your driver will need
   to do.

Related to these comments, keep in mind that the --executor-memory,
--num-executors, and --executor-cores configurations can be useful when
tuning the worker nodes. There is some great information in the Spark
Tuning Guide (linked below) that you may find useful as well.

http://spark.apache.org/docs/latest/tuning.html

Hope that helps!
Kevin

On Mon, Sep 19, 2016 at 9:32 AM, Anand Viswanathan <
anand_v...@ymail.com.invalid> wrote:

> Hi,
>
> Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark.
>
> I am running a machine learning program, which runs perfectly by
> specifying 2G for —driver-memory.
> However the program cannot be run with default 1G, driver crashes with OOM
> error.
>
> What is the recommended configuration for —driver-memory…? Please suggest.
>
> Thanks and regards,
> Anand.
>
>


Re: Java Compatibity Problems when we install rJava

2016-09-19 Thread Sean Owen
This isn't a Spark question, so I don't think this is the right place.

It shows that compilation of rJava failed for lack of some other
shared libraries (not Java-related). I think you'd have to get those
packages installed locally too.

If it ends up being Anaconda specific, you should try Continuum, or if
it looks CDH-related head to Cloudera support.

On Mon, Sep 19, 2016 at 8:29 PM, Arif,Mubaraka  wrote:
> We are trying to install rJava on suse Linux running Cloudera Hadoop CDH
> 5.7.2  with Spark 1.6.
>
> Anaconda 4.0 was installed using the CDH parcel.
>
>
>
> Have setup for Jupyter notebook but there are Java compability problems.
>
>
>
> For Java we are running :
>
>
>
> java version "1.8.0_51"
> Java(TM) SE Runtime Environment (build 1.8.0_51-tdc1-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)
>
>
>
> We followed the instructions from the blog :
> http://www.ibm.com/support/knowledgecenter/SSPT3X_3.0.0/com.ibm.swg.im.infosphere.biginsights.install.doc/doc/install_install_r.html
>
>
>
> After running we get the output : [as attached in file - rJava_err.txt]
>
>
>
> Any help is greatly appreciated.
>
>
>
> thanks,
>
> Muby
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Job not failing

2016-09-19 Thread Mich Talebzadeh
As I understanding you are inserting into RDBMS from Spark and the insert
is failing on RDBMS due to duplicate primary key but not acknowledged by
Spark? Is this correct

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 19 September 2016 at 20:19, tosaigan...@gmail.com 
wrote:

>
> Hi ,
>
> I have primary key on sql table iam trying to insert Dataframe into table
> using insertIntoJDBC.
>
> I could see failure instances in logs but still spark job is getting
> successful. Do you know  how can we handle in code to make it fail?
>
>
>
> 16/09/19 18:52:51 INFO TaskSetManager: Starting task 0.99 in stage 82.0
> (TID
> 5032, 10.0.0.24, partition 0,PROCESS_LOCAL, 11300 bytes)
> 16/09/19 18:52:52 INFO TaskSetManager: Lost task 0.99 in stage 82.0 (TID
> 5032) on executor 10.0.0.24: java.sql.BatchUpdateException (Violation of
> PRIMARY KEY constraint 'pk_unique'. Cannot insert duplicate key in object
> 'table_name'. The duplicate key value is (2016-09-13 04:00, 2016-09-13
> 04:15, 5816324).) [duplicate 99]
> 16/09/19 18:52:52 ERROR TaskSetManager: Task 0 in stage 82.0 failed 100
> times; aborting job
> 16/09/19 18:52:52 INFO YarnClusterScheduler: Removed TaskSet 82.0, whose
> tasks have all completed, from pool
> 16/09/19 18:52:52 INFO YarnClusterScheduler: Cancelling stage 82
> 16/09/19 18:52:52 INFO DAGScheduler: ResultStage 82 (insertIntoJDBC at
> sparkjob.scala:143) failed in 9.440 s
> 16/09/19 18:52:52 INFO DAGScheduler: Job 19 failed: insertIntoJDBC at
> sparkjob.scala:143, took 9.449118 s
> 16/09/19 18:52:52 INFO ApplicationMaster: Final app status: SUCCEEDED,
> exitCode: 0
> 16/09/19 18:52:52 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> Regards,
> Sai
>
>
>
> -
> Sai Ganesh
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Job-not-failing-tp27756.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Java Compatibity Problems when we install rJava

2016-09-19 Thread Arif,Mubaraka




We are trying to install rJava on suse Linux running Cloudera Hadoop CDH 5.7.2  with Spark 1.6.

Anaconda 4.0 was installed using the CDH parcel. 
 
Have setup for Jupyter notebook but there are Java compability problems.
 
For Java we are running :
 
java version "1.8.0_51"
Java(TM) SE Runtime Environment (build 1.8.0_51-tdc1-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)
 
We followed the instructions from the blog : 
http://www.ibm.com/support/knowledgecenter/SSPT3X_3.0.0/com.ibm.swg.im.infosphere.biginsights.install.doc/doc/install_install_r.html
 
After running we get the output : [as attached in file - rJava_err.txt]
 
Any help is greatly appreciated.
 
thanks,
Muby





The downloaded source packages are in
â/tmp/RtmpfdQwg8/downloaded_packagesâ
[I 13:41:00.060 NotebookApp] Saving file at /R_test.ipynb
trying URL 'http://cran.us.r-project.org/src/contrib/rJava_0.9-8.tar.gz'
Content type 'application/x-gzip' length 656615 bytes (641 KB)
==
downloaded 641 KB

* installing *source* package ârJavaâ ...
** package ârJavaâ successfully unpacked and MD5 sums checked
checking for gcc... gcc -std=gnu99
checking whether the C compiler works... yes
checking for C compiler default output file name... a.out
checking for suffix of executables...
checking whether we are cross compiling... no
checking for suffix of object files... o
checking whether we are using the GNU C compiler... yes
checking whether gcc -std=gnu99 accepts -g... yes
checking for gcc -std=gnu99 option to accept ISO C89... none needed
checking how to run the C preprocessor... gcc -std=gnu99 -E
checking for grep that handles long lines and -e... /usr/bin/grep
checking for egrep... /usr/bin/grep -E
checking for ANSI C header files... yes
checking for sys/wait.h that is POSIX.1 compatible... yes
checking for sys/types.h... yes
checking for sys/stat.h... yes
checking for stdlib.h... yes
checking for string.h... yes
checking for memory.h... yes
checking for strings.h... yes
checking for inttypes.h... yes
checking for stdint.h... yes
checking for unistd.h... yes
checking for string.h... (cached) yes
checking sys/time.h usability... yes
checking sys/time.h presence... yes
checking for sys/time.h... yes
checking for unistd.h... (cached) yes
checking for an ANSI C-conforming const... yes
checking whether time.h and sys/time.h may both be included... yes
configure: checking whether gcc -std=gnu99 supports static inline...
yes
checking whether setjmp.h is POSIX.1 compatible... yes
checking whether sigsetjmp is declared... yes
checking whether siglongjmp is declared... yes
checking Java support in R... present:
interpreter : '/opt/teradata/jvm64/jdk8/jre/bin/java'
archiver: '/opt/teradata/jvm64/jdk8/bin/jar'
compiler: '/opt/teradata/jvm64/jdk8/bin/javac'
header prep.: '/opt/teradata/jvm64/jdk8/bin/javah'
cpp flags   : '-I/opt/teradata/jvm64/jdk8/include 
-I/opt/teradata/jvm64/jdk8/include/linux'
java libs   : '-L/opt/teradata/jvm64/jdk8/jre/lib/amd64/server -ljvm -ldl'
checking whether Java run-time works... yes
checking whether -Xrs is supported... yes
checking whether JNI programs can be compiled... yes
checking JNI data types... ok
checking whether JRI should be compiled (autodetect)... yes
checking whether debugging output should be enabled... no
checking whether memory profiling is desired... no
checking whether threads support is requested... no
checking whether callbacks support is requested... no
checking whether JNI cache support is requested... no
checking whether headless init is enabled... no
checking whether JRI is requested... yes
configure: creating ./config.status
config.status: creating src/Makevars
config.status: creating R/zzz.R
config.status: creating src/config.h
=== configuring in jri (/tmp/RtmpU52Epw/R.INSTALL1fd0c64aa2bcd/rJava/jri)
configure: running /bin/sh ./configure --disable-option-checking 
'--prefix=/usr/local'  --cache-file=/dev/null --srcdir=.
checking build system type... x86_64-unknown-linux-gnu
checking host system type... x86_64-unknown-linux-gnu
checking for gcc... gcc -std=gnu99
checking whether the C compiler works... yes
checking for C compiler default output file name... a.out
checking for suffix of executables...
checking whether we are cross compiling... no
checking for suffix of object files... o
checking whether we are using the GNU C compiler... yes
checking whether gcc -std=gnu99 accepts -g... yes
checking for gcc -std=gnu99 option to accept ISO C89... none needed
checking how to run the C preprocessor... gcc -std=gnu99 -E
checking for grep that handles long lines and -e... /usr/bin/grep
checking for egrep... /usr/bin/grep -E
checking for ANSI C header files... yes
checking whether Java interpreter works... checking whether JNI programs can be 
compiled... yes
checking whether JNI programs can be run... yes
checking JNI data types... ok
checking whether Rinterface.h exports R_CStackXXX variables... yes
checking whether Rin

Re: off heap to alluxio/tachyon in Spark 2

2016-09-19 Thread Sean Owen
It backed the "OFF_HEAP" storage level for RDDs. That's not quite the
same thing that off-heap Tungsten allocation refers to.

It's also worth pointing out that things like HDFS also can put data
into memory already.

On Mon, Sep 19, 2016 at 7:48 PM, Richard Catlin
 wrote:
> Here is my understanding.
>
> Spark used Tachyon as an off-heap solution for RDDs.  In certain situations,
> it would alleviate Garbage Collection or the RDDs.
>
> Tungsten, Spark 2’s off-heap (columnar format) is much more efficient and
> used as the default.  Alluvio no longer makes sense for this use.
>
>
> You can still use Tachyon/Alluxio to bring your files into Memory, which is
> quicker for Spark to access than your DFS(HDFS or S3).
>
> Alluxio actually supports a “Tiered Filesystem”, and automatically brings
> the “hotter” files into the fastest storage (Memory, SSD).  You can
> configure it with Memory, SSD, and/or HDDs with the DFS as the persistent
> store, called under-filesystem.
>
> Hope this helps.
>
> Richard Catlin
>
> On Sep 19, 2016, at 7:56 AM, aka.fe2s  wrote:
>
> Hi folks,
>
> What has happened with Tachyon / Alluxio in Spark 2? Doc doesn't mention it
> no longer.
>
> --
> Oleksiy Dyagilev
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark DataFrame Join _ performance issues

2016-09-19 Thread Subhajit Purkayastha
I am running my spark (1.5.2) instance in a virtualbox VM. I have 10gb
memory allocated to it.

 

I have a fact table extract, with 1 rows

 

var glbalance_df_select = glbalance_df.select
("LEDGER_ID","CODE_COMBINATION_ID","CURRENCY_CODE",

 
"PERIOD_TYPE","TEMPLATE_ID",

 
"PERIOD_NAME","ACTUAL_FLAG","BUDGET_VERSION_ID",

 
"TRANSLATED_FLAG","PERIOD_NET_DR","PERIOD_NET_CR",

 
"BEGIN_BALANCE_DR","BEGIN_BALANCE_CR")


   .filter( 

 
not(glbalance_df("CURRENCY_CODE")=== "STAT") 

   and 

 
(glbalance_df("TEMPLATE_ID").isNull || glbalance_df("TEMPLATE_ID") ===
"None")

   and

 
(glbalance_df("TRANSLATED_FLAG") === "Y" ||
glbalance_df("TRANSLATED_FLAG").isNull || glbalance_df("TRANSLATED_FLAG")
=== "None" )

   and

 
(glbalance_df("ACTUAL_FLAG") === "A" or glbalance_df("ACTUAL_FLAG") === "B")

   )

 

 

I am joining the fact table to  the first dimension (with 100 rows). 

 

var glbalance_ledger_df = glbalance_df_select.join(ledger_df_select, 

  glbalance_df_select("LEDGER_ID") <=>
ledger_df_select("LEDGER_ID"),

 "inner" )

 .drop(ledger_df_select("LEDGER_ID"))

 

When I save the DataFrame "glbalance_ledger_df" to a textfile , it saves the
data in 1 mins

 

2nd dimension dataframe 

 

tableName = "w_gl_period_d"

var period_df_select = msc.table(s"$dbName.$tableName")

period_df_select = period_df_select.select("PERIOD_NAME",
"PERIOD_TYPE",

"PERIOD_SET_NAME"
,"START_DATE","END_DATE" ).cache()

 

Now I join the 2nd dimension DF to the resultant of the fact DF and save the
data, it takes 2hrs. 

 

var glbalance_ledger_period_df = glbalance_ledger_df.join(period_df_select,


  glbalance_ledger_df("PERIOD_SET_NAME") <=>
period_df_select("PERIOD_SET_NAME")

  && glbalance_ledger_df("PERIOD_NAME") <=>
period_df_select("PERIOD_NAME")

  && glbalance_ledger_df("PERIOD_TYPE") <=>
period_df_select("PERIOD_TYPE")

  ,

 "inner" )

 

 

How do I improve the performance of the join?

 

Thx,

 

Subhajit



Spark Job not failing

2016-09-19 Thread tosaigan...@gmail.com

Hi ,

I have primary key on sql table iam trying to insert Dataframe into table
using insertIntoJDBC.

I could see failure instances in logs but still spark job is getting
successful. Do you know  how can we handle in code to make it fail?



16/09/19 18:52:51 INFO TaskSetManager: Starting task 0.99 in stage 82.0 (TID
5032, 10.0.0.24, partition 0,PROCESS_LOCAL, 11300 bytes)
16/09/19 18:52:52 INFO TaskSetManager: Lost task 0.99 in stage 82.0 (TID
5032) on executor 10.0.0.24: java.sql.BatchUpdateException (Violation of
PRIMARY KEY constraint 'pk_unique'. Cannot insert duplicate key in object
'table_name'. The duplicate key value is (2016-09-13 04:00, 2016-09-13
04:15, 5816324).) [duplicate 99]
16/09/19 18:52:52 ERROR TaskSetManager: Task 0 in stage 82.0 failed 100
times; aborting job
16/09/19 18:52:52 INFO YarnClusterScheduler: Removed TaskSet 82.0, whose
tasks have all completed, from pool 
16/09/19 18:52:52 INFO YarnClusterScheduler: Cancelling stage 82
16/09/19 18:52:52 INFO DAGScheduler: ResultStage 82 (insertIntoJDBC at
sparkjob.scala:143) failed in 9.440 s
16/09/19 18:52:52 INFO DAGScheduler: Job 19 failed: insertIntoJDBC at
sparkjob.scala:143, took 9.449118 s
16/09/19 18:52:52 INFO ApplicationMaster: Final app status: SUCCEEDED,
exitCode: 0
16/09/19 18:52:52 INFO SparkContext: Invoking stop() from shutdown hook


Regards,
Sai



-
Sai Ganesh
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-failing-tp27756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Missing output partition file in S3

2016-09-19 Thread Richard Catlin


> Begin forwarded message:
> 
> From: "Chen, Kevin" 
> Subject: Re: Missing output partition file in S3
> Date: September 19, 2016 at 10:54:44 AM PDT
> To: Steve Loughran 
> Cc: "user@spark.apache.org" 
> 
> Hi Steve,
> 
> Our S3 is on US east. But this issue also occurred when we using a S3 bucket 
> on US west. We are using S3n. We use Spark standalone deployment. We run the 
> job in EC2. The datasets are about 25GB. We did not have speculative 
> execution turned on. We did not use DirectCommiter.
> 
> Thanks,
> Kevin
> 
> From: Steve Loughran mailto:ste...@hortonworks.com>>
> Date: Friday, September 16, 2016 at 3:46 AM
> To: Chen Kevin mailto:kevin.c...@neustar.biz>>
> Cc: "user@spark.apache.org " 
> mailto:user@spark.apache.org>>
> Subject: Re: Missing output partition file in S3
> 
> 
>> On 15 Sep 2016, at 19:37, Chen, Kevin > > wrote:
>> 
>> Hi,
>> 
>> Has any one encountered an issue of missing output partition file in S3 ? My 
>> spark job writes output to a S3 location. Occasionally, I noticed one 
>> partition file is missing. As a result, one chunk of data was lost. If I 
>> rerun the same job, the problem usually goes away. This has been happening 
>> pretty random. I observed once or twice a week on a daily run job. I am 
>> using Spark 1.2.1.
>> 
>> Very much appreciated on any input, suggestion of fix/workaround.
>> 
>> 
>> 
> 
> This doesn't sound good
> 
> Without making any promises about being able to fix this,  I would like to 
> understand the setup to see if there is something that could be done to 
> address this
> Which S3 installation? US East or elsewhere
> Which s3 client: s3n or s3a. If on hadoop 2.7+, can you switch to S3a if you 
> haven't already (exception, if you are using AWS EMR you have to stick with 
> their s3:// client)
> Are you running in-EC2 or remotely?
> How big are the datasets being generated?
> Do you have speculative execution turned on
> which committer? is the external "DirectCommitter", or the classic Hadoop 
> FileOutputCommitter? If so &you are using Hadoop 2.7.x, can you try the v2 
> algorithm (hadoop.mapreduce.fileoutputcommitter.algorithm.version 2)
> 
> I should warn that the stance of myself and colleagues is "dont commit direct 
> to S3", write to HDFS and do a distcp when you finally copy out the data. S3 
> itself doesn't have enough consistency for committing output to work in the 
> presence of all race conditions and failure modes. At least here you've 
> noticed the problem; the thing people fear is not noticing that a problem has 
> arisen
> 
> -Steve



Re: off heap to alluxio/tachyon in Spark 2

2016-09-19 Thread Richard Catlin
Here is my understanding.

Spark used Tachyon as an off-heap solution for RDDs.  In certain situations, it 
would alleviate Garbage Collection or the RDDs.

Tungsten, Spark 2’s off-heap (columnar format) is much more efficient and used 
as the default.  Alluvio no longer makes sense for this use.


You can still use Tachyon/Alluxio to bring your files into Memory, which is 
quicker for Spark to access than your DFS(HDFS or S3).

Alluxio actually supports a “Tiered Filesystem”, and automatically brings the 
“hotter” files into the fastest storage (Memory, SSD).  You can configure it 
with Memory, SSD, and/or HDDs with the DFS as the persistent store, called 
under-filesystem.

Hope this helps.

Richard Catlin

> On Sep 19, 2016, at 7:56 AM, aka.fe2s  wrote:
> 
> Hi folks,
> 
> What has happened with Tachyon / Alluxio in Spark 2? Doc doesn't mention it 
> no longer.
> 
> --
> Oleksiy Dyagilev



Re: Spark_JDBC_Partitions

2016-09-19 Thread Ajay Chander
Thank you all for your valuable inputs. Sorry for getting back late because
of personal issues.

Mich, answer to your earlier question, Yes it is a fact table. Thank you.

Ayan, I have tried ROWNUM as split column with 100 partitions. But it was
taking forever to complete the job. Thank you.

Igor, I will try your solution and let you know. Thank you.

Rabin, I agree it can be done through Sqoop. But I wanted to use Spark SQL
and see how fast would it be when compared to Sqoop. Thank you.

Suresh, If I have 100 number of partitions, do I have to provide 100
conditions(1 for 1 partition) to load the data into each partition
respectively? Thank you.






On Tue, Sep 13, 2016 at 2:05 PM, Suresh Thalamati <
suresh.thalam...@gmail.com> wrote:

> There is also another  jdbc method in  data frame  reader api o specify
> your own predicates for  each partition. Using this you can control what is
> included in  each partition.
>
> val jdbcPartitionWhereClause = Array[String]("id < 100" , "id >=100 and id < 
> 200")
> val df = spark.read.jdbc(
>   urlWithUserAndPass,
>   "TEST.PEOPLE",
>   predicates = jdbcPartitionWhereClause,
>
>   new Properties())
>
>
>
> Hope that helps.
> -suresh
>
>
> On Sep 13, 2016, at 9:44 AM, Rabin Banerjee 
> wrote:
>
> Trust me, Only thing that can help you in your situation is SQOOP oracle
> direct connector which is known as  ORAOOP. Spark cannot do everything ,
> you need a OOZIE workflow which will trigger sqoop job with oracle direct
> connector to pull the data then spark batch to process .
>
> Hope it helps !!
>
> On Tue, Sep 13, 2016 at 6:10 PM, Igor Racic  wrote:
>
>> Hi,
>>
>> One way can be to use NTILE function to partition data.
>> Example:
>>
>> REM Creating test table
>> create table Test_part as select * from ( select rownum rn from
>> all_tables t1 ) where rn <= 1000;
>>
>> REM Partition lines by Oracle block number, 11 partitions in this
>> example.
>> select ntile(11) over( order by dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) )
>> nt from Test_part
>>
>>
>> Let's see distribution:
>>
>> select nt, count(*) from ( select ntile(11) over( order by
>> dbms_rowid.ROWID_BLOCK_NUMBER( rowid ) ) nt from Test_part) group by nt;
>>
>> NT   COUNT(*)
>> -- --
>>  1 10
>>  6 10
>> 11  9
>>  2 10
>>  4 10
>>  5 10
>>  8 10
>>  3 10
>>  7 10
>>  9  9
>> 10  9
>>
>> 11 rows selected.
>> ^^ It looks good. Sure feel free to chose any other condition to order
>> your lines as best suits your case
>>
>> So you can
>> 1) have one session reading and then decide where line goes (1 reader )
>> 2) Or do multiple reads by specifying partition number. Note that in this
>> case you read whole table n times (in parallel) and is more internsive on
>> read part. (multiple readers)
>>
>> Regards,
>> Igor
>>
>>
>>
>> 2016-09-11 0:46 GMT+02:00 Mich Talebzadeh :
>>
>>> Good points
>>>
>>> Unfortunately databump. expr, imp use binary format for import and
>>> export. that cannot be used to import data into HDFS in a suitable way.
>>>
>>> One can use what is known as flat,sh script to get data out tab or ,
>>> separated etc.
>>>
>>> ROWNUM is a pseudocolumn (not a real column) that is available in a
>>> query. The issue is that in a table of 280Million rows to get the position
>>> of the row it will have to do a table scan since no index cannot be built
>>> on it (assuming there is no other suitable index). Not ideal but can be
>>> done.
>>>
>>> I think a better alternative is to use datapump to take that table to
>>> DEV/TEST, add a sequence (like an IDENTITY column in Sybase), build a
>>> unique index on the sequence column and do the partitioning there.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 10 September 2016 at 22:37, ayan guha  wrote:
>>>
 In oracle something called row num is present in every row.  You can
 create an evenly distribution using that column. If it is one time work,
 try using sqoop. Are you using Oracle's own appliance? Then you can use
 data pump format
 On 11 Sep 2016 01:59, "Mich Talebzadeh" 
 wrote:

> creating an Oracle sequence for a table of 200million is not going to
> be that easy without changing the schema. It is possible t

Re: Issues while running MLlib matrix factorization ALS algorithm

2016-09-19 Thread Roshani Nagmote
Thanks Nick. Its working

On Mon, Sep 19, 2016 at 11:11 AM, Nick Pentreath 
wrote:

> Try als.setCheckpointInterval (http://spark.apache.org/docs/
> latest/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS@
> setCheckpointInterval(checkpointInterval:Int):ALS.this.type)
>
>
> On Mon, 19 Sep 2016 at 20:01 Roshani Nagmote 
> wrote:
>
>> Hello Sean,
>>
>> Can you please tell me how to set checkpoint interval? I did set
>> checkpointDir("hdfs:/") But if I want to reduce the default value of
>> checkpoint interval which is 10. How should it be done?
>>
>> Sorry is its a very basic question. I am a novice in spark.
>>
>> Thanks,
>> Roshani
>>
>> On Fri, Sep 16, 2016 at 11:14 AM, Roshani Nagmote <
>> roshaninagmo...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Thanks for your reply.
>>>
>>> Yes, Its netflix dataset. And when I get no space on device, my ‘/mnt’
>>> directory gets filled up. I checked.
>>>
>>> /usr/lib/spark/bin/spark-submit --deploy-mode cluster --master yarn
>>> --class org.apache.spark.examples.mllib.MovieLensALS --jars
>>> /usr/lib/spark/examples/jars/scopt_2.11-3.3.0.jar
>>> /usr/lib/spark/examples/jars/spark-examples_2.11-2.0.0.jar *--rank 32
>>> --numIterations 100* --kryo s3://dataset_netflix
>>>
>>> When I run above command, I get following error
>>>
>>> Job aborted due to stage failure: Task 221 in stage 53.0 failed 4 times,
>>> most recent failure: Lost task 221.3 in stage 53.0 (TID 9817, ):
>>> java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/
>>> appcache/application_1473786456609_0042/blockmgr-
>>> 045c2dec-7765-4954-9c9a-c7452f7bd3b7/08/shuffle_168_
>>> 221_0.data.b17d39a6-4d3c-4198-9e25-e19ca2b4d368 (No space left on
>>> device)
>>>
>>> I think I should not need to increase the space on device, as data is
>>> not that big. So, is there any way, I can setup parameters so that it does
>>> not use much disk space. I don’t know much about tuning parameters.
>>>
>>> It will be great if anyone can help me with this.
>>>
>>> Thanks,
>>> Roshani
>>>
>>> On Sep 16, 2016, at 9:18 AM, Sean Owen  wrote:
>>>
>>>
>>>
>>>
>>>
>>


Re: take() works on RDD but .write.json() does not work in 2.0.0

2016-09-19 Thread Kevin Burton
I tried with write.json and write.csv.  The write.text method won't work
because I have more than one column and refuses to execute.

Doesn't seem to work on any data.

On Sat, Sep 17, 2016 at 10:52 PM, Hyukjin Kwon  wrote:

> Hi Kevin,
>
> I have few questions on this.
>
> Does that only not work with write.json() ? I just wonder if write.text,
> csv or another API does not work as well and it is a JSON specific issue.
>
> Also, does that work with small data? I want to make sure if this happen
> only on large data.
>
> Thanks!
>
>
>
> 2016-09-18 6:42 GMT+09:00 Kevin Burton :
>
>> I'm seeing some weird behavior and wanted some feedback.
>>
>> I have a fairly large, multi-hour job that operates over about 5TB of
>> data.
>>
>> It builds it out into a ranked category index of about 25000 categories
>> sorted by rank, descending.
>>
>> I want to write this to a file but it's not actually writing any data.
>>
>> if I run myrdd.take(100) ... that works fine and prints data to a file.
>>
>> If I run
>>
>> myrdd.write.json(), it takes the same amount of time, and then writes a
>> local file with a SUCCESS file but no actual partition data in the file.
>> There's only one small file with SUCCESS.
>>
>> Any advice on how to debug this?
>>
>> --
>>
>> We’re hiring if you know of any awesome Java Devops or Linux Operations
>> Engineers!
>>
>> Founder/CEO Spinn3r.com
>> Location: *San Francisco, CA*
>> blog: http://burtonator.wordpress.com
>> … or check out my Google+ profile
>> 
>>
>>
>


-- 

We’re hiring if you know of any awesome Java Devops or Linux Operations
Engineers!

Founder/CEO Spinn3r.com
Location: *San Francisco, CA*
blog: http://burtonator.wordpress.com
… or check out my Google+ profile



Re: Issues while running MLlib matrix factorization ALS algorithm

2016-09-19 Thread Nick Pentreath
Try als.setCheckpointInterval (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS@setCheckpointInterval(checkpointInterval:Int):ALS.this.type
)

On Mon, 19 Sep 2016 at 20:01 Roshani Nagmote 
wrote:

> Hello Sean,
>
> Can you please tell me how to set checkpoint interval? I did set
> checkpointDir("hdfs:/") But if I want to reduce the default value of
> checkpoint interval which is 10. How should it be done?
>
> Sorry is its a very basic question. I am a novice in spark.
>
> Thanks,
> Roshani
>
> On Fri, Sep 16, 2016 at 11:14 AM, Roshani Nagmote <
> roshaninagmo...@gmail.com> wrote:
>
>> Hello,
>>
>> Thanks for your reply.
>>
>> Yes, Its netflix dataset. And when I get no space on device, my ‘/mnt’
>> directory gets filled up. I checked.
>>
>> /usr/lib/spark/bin/spark-submit --deploy-mode cluster --master yarn
>> --class org.apache.spark.examples.mllib.MovieLensALS --jars
>> /usr/lib/spark/examples/jars/scopt_2.11-3.3.0.jar
>> /usr/lib/spark/examples/jars/spark-examples_2.11-2.0.0.jar *--rank 32
>> --numIterations 100* --kryo s3://dataset_netflix
>>
>> When I run above command, I get following error
>>
>> Job aborted due to stage failure: Task 221 in stage 53.0 failed 4 times,
>> most recent failure: Lost task 221.3 in stage 53.0 (TID 9817, ):
>> java.io.FileNotFoundException:
>> /mnt/yarn/usercache/hadoop/appcache/application_1473786456609_0042/blockmgr-045c2dec-7765-4954-9c9a-c7452f7bd3b7/08/shuffle_168_221_0.data.b17d39a6-4d3c-4198-9e25-e19ca2b4d368
>> (No space left on device)
>>
>> I think I should not need to increase the space on device, as data is not
>> that big. So, is there any way, I can setup parameters so that it does not
>> use much disk space. I don’t know much about tuning parameters.
>>
>> It will be great if anyone can help me with this.
>>
>> Thanks,
>> Roshani
>>
>> On Sep 16, 2016, at 9:18 AM, Sean Owen  wrote:
>>
>>
>>
>>
>>
>


Re: Issues while running MLlib matrix factorization ALS algorithm

2016-09-19 Thread Roshani Nagmote
Hello Sean,

Can you please tell me how to set checkpoint interval? I did set
checkpointDir("hdfs:/") But if I want to reduce the default value of
checkpoint interval which is 10. How should it be done?

Sorry is its a very basic question. I am a novice in spark.

Thanks,
Roshani

On Fri, Sep 16, 2016 at 11:14 AM, Roshani Nagmote  wrote:

> Hello,
>
> Thanks for your reply.
>
> Yes, Its netflix dataset. And when I get no space on device, my ‘/mnt’
> directory gets filled up. I checked.
>
> /usr/lib/spark/bin/spark-submit --deploy-mode cluster --master yarn
> --class org.apache.spark.examples.mllib.MovieLensALS --jars
> /usr/lib/spark/examples/jars/scopt_2.11-3.3.0.jar
> /usr/lib/spark/examples/jars/spark-examples_2.11-2.0.0.jar *--rank 32
> --numIterations 100* --kryo s3://dataset_netflix
>
> When I run above command, I get following error
>
> Job aborted due to stage failure: Task 221 in stage 53.0 failed 4 times,
> most recent failure: Lost task 221.3 in stage 53.0 (TID 9817, ):
> java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/
> appcache/application_1473786456609_0042/blockmgr-045c2dec-7765-4954-9c9a-
> c7452f7bd3b7/08/shuffle_168_221_0.data.b17d39a6-4d3c-4198-9e25-e19ca2b4d368
> (No space left on device)
>
> I think I should not need to increase the space on device, as data is not
> that big. So, is there any way, I can setup parameters so that it does not
> use much disk space. I don’t know much about tuning parameters.
>
> It will be great if anyone can help me with this.
>
> Thanks,
> Roshani
>
> On Sep 16, 2016, at 9:18 AM, Sean Owen  wrote:
>
>
>
>
>


Re: Missing output partition file in S3

2016-09-19 Thread Chen, Kevin
Hi Steve,

Our S3 is on US east. But this issue also occurred when we using a S3 bucket on 
US west. We are using S3n. We use Spark standalone deployment. We run the job 
in EC2. The datasets are about 25GB. We did not have speculative execution 
turned on. We did not use DirectCommiter.

Thanks,
Kevin

From: Steve Loughran mailto:ste...@hortonworks.com>>
Date: Friday, September 16, 2016 at 3:46 AM
To: Chen Kevin mailto:kevin.c...@neustar.biz>>
Cc: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Re: Missing output partition file in S3


On 15 Sep 2016, at 19:37, Chen, Kevin 
mailto:kevin.c...@neustar.biz>> wrote:

Hi,

Has any one encountered an issue of missing output partition file in S3 ? My 
spark job writes output to a S3 location. Occasionally, I noticed one partition 
file is missing. As a result, one chunk of data was lost. If I rerun the same 
job, the problem usually goes away. This has been happening pretty random. I 
observed once or twice a week on a daily run job. I am using Spark 1.2.1.

Very much appreciated on any input, suggestion of fix/workaround.




This doesn't sound good

Without making any promises about being able to fix this,  I would like to 
understand the setup to see if there is something that could be done to address 
this

  1.  Which S3 installation? US East or elsewhere
  2.  Which s3 client: s3n or s3a. If on hadoop 2.7+, can you switch to S3a if 
you haven't already (exception, if you are using AWS EMR you have to stick with 
their s3:// client)
  3.  Are you running in-EC2 or remotely?
  4.  How big are the datasets being generated?
  5.  Do you have speculative execution turned on
  6.  which committer? is the external "DirectCommitter", or the classic Hadoop 
FileOutputCommitter? If so &you are using Hadoop 2.7.x, can you try the v2 
algorithm (hadoop.mapreduce.fileoutputcommitter.algorithm.version 2)

I should warn that the stance of myself and colleagues is "dont commit direct 
to S3", write to HDFS and do a distcp when you finally copy out the data. S3 
itself doesn't have enough consistency for committing output to work in the 
presence of all race conditions and failure modes. At least here you've noticed 
the problem; the thing people fear is not noticing that a problem has arisen

-Steve


Re: Can not control bucket files number if it was speficed

2016-09-19 Thread Fridtjof Sander
I didn't follow all of this thread, but if you want to have exactly one 
bucket-output-file per RDD-partition, you have to repartition (shuffle) 
your data on the bucket-key.
If you don't repartition (shuffle), you may have records with different 
bucket-keys in the same RDD-partition, leading to two 
bucket-output-files for that RDD-partition.
So, in your example from Sep 17, you're missing a res.repartition(8, 
"xxx_id").write...



Am 19.09.2016 um 16:26 schrieb Qiang Li:
I tried dataframe writer with coalesce or repartition api, but it can 
not meet my requirements, I still can get far more files than bucket 
number, and spark jobs is very slow after I add coalesce or repartition.


I've get back to Hive, use Hive to do data conversion.

Thanks.

On Sat, Sep 17, 2016 at 11:12 PM, Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>> wrote:


Ok

You have an external table in Hive  on S3 with partition and
bucket. say

..
PARTITIONED BY (year int, month string)
CLUSTERED BY (prod_id) INTO 256 BUCKETS
STORED AS ORC.

with have within each partition buckets on prod_id equally spread
to 256 hash partitions/bucket. bucket is the hash partitioning
within a Hive table partition.

Now my question is how do you force data to go for a given
partition p into bucket n. Since you have already specified say
256 buckets then whatever prod_id is, it still has to go to one of
256 buckets.

Within Spark , the number of files is actually the number of
underlying RDD partitions.  You can find this out by invoking
toJavaRDD.partitions.size() and force it to accept a certain
number of partitions by using coalesce(n) or something like that.
However, I am not sure the output will be what you expect to be.

Worth trying to sort it out the way you want with partition 8

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val s = spark.read.parquet("oraclehadoop.sales2")
s.coalesce(8).registerTempTable("tmp")
HiveContext.sql("SELECT * FROM tmp SORT BY
prod_id").write.mode("overwrite").parquet("test.sales6")


It may work.

HTH



Dr Mich Talebzadeh

LinkedIn

/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

/

http://talebzadehmich.wordpress.com



*Disclaimer:* Use it at your own risk.Any and all responsibility
for any loss, damage or destruction of data or any other property
which may arise from relying on this email's technical content is
explicitly disclaimed. The author will in no case be liable for
any monetary damages arising from such loss, damage or destruction.


On 17 September 2016 at 15:00, Qiang Li mailto:q...@appannie.com>> wrote:

I want to run job to load existing data from one S3 bucket,
process it, then store to another bucket with Partition, and
Bucket (data format conversion from tsv to parquet with gzip).
So source data and results both are in S3, different are the
tools which I used to process data.

First I process data with Hive, create external tables with s3
 location with partition and bucket number, jobs will generate
files under each partition directory, and it was equal bucket
number.
then everything is ok, I also can use hive/presto/spark to run
other jobs on results data in S3.

But if I run spark job with partition and bucket, sort
feature, spark job will generate far more files than bucket
number under each partition directory, so presto or hive can
not recongnize  the bucket because wrong files number is not
equal bucket number in spark job.

for example:
...
val options = Map("path" -> "result_bucket_path",
"compression" -> "gzip")
res.write.mode("append").format("parquet").partitionBy("year",
"month", "day").bucketBy(8,

"xxx_id").sortBy("xxx_id").options(options).saveAsTable("result_bucket_name")
...

The results bucket files under each partition is far more than 8.


On Sat, Sep 17, 2016 at 9:27 PM, Mich Talebzadeh
mailto:mich.talebza...@gmail.com>>
wrote:

It is difficult to guess what is happening with your data.

First when you say you use Spark to generate test data are
these selected randomly and then stored in Hive/etc table?

HTH

Dr Mich Talebzadeh

LinkedIn

/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

/

http://talebzadehmich.wordpress.com



*Disclaimer:*

NumberFormatException: For input string: "0.00000"

2016-09-19 Thread Mohamed ismail
Hi all

I am trying to read: 

sc.textFile(DataFile).mapPartitions(lines => {
val parser = new CSVParser(",")
lines.map(line=>parseLineToTuple(line, parser))
})
Data looks like:
android 
phone,0,0,0,,0,0,0,0,0,0,0,5,0,0,0,5,0,0.0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0,0.0,0,0,0
ios 
phone,0,-1,0,,0,0,0,0,0,0,1,0,0,0,0,1,0,0.0,0.0,0.0,0.0,0.0,0,0,0,0,0,0,0,0.0,0,0,0

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in 
stage 23055.0 failed 4 times, most recent failure: Lost task 1.3 in stage 
23055.0 (TID 191607, ): 
java.lang.NumberFormatException: For input string: "0.0"

Has anyone faced such issues. Is there a solution?

Thanks,
Mohamed



Re: Total Shuffle Read and Write Size of Spark workload

2016-09-19 Thread Mike Metzger
While the SparkListener method is likely all around better, if you just
need this quickly you should be able to do a SSH local port redirection
over putty.  In the putty configuration:

- Go to Connection: SSH: Tunnels
- In the Source port field, enter 4040 (or another unused port on your
machine)
- In the Destination field, enter ipaddress:4040 where ipaddress is the IP
you'd normally access of the spark server.  If it's the same server you're
SSH'ing to it can be 127.0.0.1
- Make sure the "Local" and "Auto" radio buttons are checked and click "Add"
- Go back to the Session section and enter the IP / etc configuration
- If you're going to use this often, enter a name and save the
configuration.  Otherwise click open and login as normal.

Once the session is established, you should be able to open a web browser
to http://localhost:4040 which will redirect over the SSH session to the
remote server.  Note that any link that references a non-accessible IP
address can't be reached (though you can also setup putty / SSH as a proxy
to get around that if needed).

Thanks

Mike


On Mon, Sep 19, 2016 at 4:43 AM, Cristina Rozee 
wrote:

> I Mich,
>
> I do not have access to UI as I am running jobs on remote system and I can
> access it using putty only so only console or logs files are available to
> me.
>
> Thanks
>
> On Mon, Sep 19, 2016 at 11:36 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Spark UI on port 4040 by default
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 19 September 2016 at 10:34, Cristina Rozee 
>> wrote:
>>
>>> Could you please explain a little bit?
>>>
>>>
>>> On Sun, Sep 18, 2016 at 10:19 PM, Jacek Laskowski 
>>> wrote:
>>>
 SparkListener perhaps?

 Jacek

 On 15 Sep 2016 1:41 p.m., "Cristina Rozee" 
 wrote:

> Hello,
>
> I am running a spark application and I would like to know the total
> amount of shuffle data (read + write ) so could anyone let me know how to
> get this information?
>
> Thank you
> Cristina.
>

>>>
>>
>


Get profile from sbt

2016-09-19 Thread Saurabh Malviya (samalviy)
Hi,

Is there any way equivalent to profiles in maven in sbt. I want spark build to 
pick up endpoints based on environment jar is built for

In build.sbt we are ingesting variable dev,stage etc and pick up all 
dependencies. Similar way I need a way to pick up config for external 
dependencies like endpoints etc.

Or another approach is there any way I can access variable defined in built.sbt 
in scala code.

-Saurabh


off heap to alluxio/tachyon in Spark 2

2016-09-19 Thread aka.fe2s
Hi folks,

What has happened with Tachyon / Alluxio in Spark 2? Doc doesn't mention it
no longer.

--
Oleksiy Dyagilev


driver OOM - need recommended memory for driver

2016-09-19 Thread Anand Viswanathan
Hi,

Spark version :spark-1.5.2-bin-hadoop2.6 ,using pyspark. 

I am running a machine learning program, which runs perfectly by specifying 2G 
for —driver-memory.
However the program cannot be run with default 1G, driver crashes with OOM 
error.

What is the recommended configuration for —driver-memory…? Please suggest.

Thanks and regards,
Anand.



Re: Can not control bucket files number if it was speficed

2016-09-19 Thread Qiang Li
I tried dataframe writer with coalesce or repartition api, but it can not
meet my requirements, I still can get far more files than bucket number,
and spark jobs is very slow after I add coalesce or repartition.

I've get back to Hive, use Hive to do data conversion.

Thanks.

On Sat, Sep 17, 2016 at 11:12 PM, Mich Talebzadeh  wrote:

> Ok
>
> You have an external table in Hive  on S3 with partition and bucket. say
>
> ..
> PARTITIONED BY (year int, month string)
> CLUSTERED BY (prod_id) INTO 256 BUCKETS
> STORED AS ORC.
>
> with have within each partition buckets on prod_id equally spread to 256
> hash partitions/bucket. bucket is the hash partitioning within a Hive table
> partition.
>
> Now my question is how do you force data to go for a given partition p
> into bucket n. Since you have already specified say 256 buckets then
> whatever prod_id is, it still has to go to one of 256 buckets.
>
> Within Spark , the number of files is actually the number of underlying
> RDD partitions.  You can find this out by invoking toJavaRDD.partitions.size()
> and force it to accept a certain number of partitions by using coalesce(n)
> or something like that. However, I am not sure the output will be what you
> expect to be.
>
> Worth trying to sort it out the way you want with partition 8
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> val s = spark.read.parquet("oraclehadoop.sales2")
> s.coalesce(8).registerTempTable("tmp")
> HiveContext.sql("SELECT * FROM tmp SORT BY prod_id").write.mode("
> overwrite").parquet("test.sales6")
>
>
> It may work.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 17 September 2016 at 15:00, Qiang Li  wrote:
>
>> I want to run job to load existing data from one S3 bucket, process it,
>> then store to another bucket with Partition, and Bucket (data format
>> conversion from tsv to parquet with gzip). So source data and results both
>> are in S3, different are the tools which I used to process data.
>>
>> First I process data with Hive, create external tables with s3  location
>> with partition and bucket number, jobs will generate files under each
>> partition directory, and it was equal bucket number.
>> then everything is ok, I also can use hive/presto/spark to run other jobs
>> on results data in S3.
>>
>> But if I run spark job with partition and bucket, sort feature, spark job
>> will generate far more files than bucket number under each partition
>> directory, so presto or hive can not recongnize  the bucket because wrong
>> files number is not equal bucket number in spark job.
>>
>> for example:
>> ...
>> val options = Map("path" -> "result_bucket_path", "compression" -> "gzip")
>> res.write.mode("append").format("parquet").partitionBy("year", "month",
>> "day").bucketBy(8, "xxx_id").sortBy("xxx_id").opt
>> ions(options).saveAsTable("result_bucket_name")
>> ...
>>
>> The results bucket files under each partition is far more than 8.
>>
>>
>> On Sat, Sep 17, 2016 at 9:27 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> It is difficult to guess what is happening with your data.
>>>
>>> First when you say you use Spark to generate test data are these
>>> selected randomly and then stored in Hive/etc table?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 17 September 2016 at 13:59, Qiang Li  wrote:
>>>
 Hi,

 I use spark to generate data , then we use hive/pig/presto/spark to
 analyze data, but I found even I add used bucketBy and sortBy with bucket
 number in Spark, the results files was generate by Spark is always far more
 than bucket number under each partition, then Presto can not recognize the
 bucket, how can I control that in Spark ?

 Unfortunately, I did not find any way to do that.

 Thank you.

 --
 A

Re: Finding unique across all columns in dataset

2016-09-19 Thread Mich Talebzadeh
something like this

df.filter('transactiontype > " ").filter(not('transactiontype ==="DEB") &&
not('transactiontype ==="BGC")).select('transactiontype).*distinct*
.collect.foreach(println)

HTH





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 19 September 2016 at 14:12, ayan guha  wrote:

> Hi
>
> If you want column wise distinct, you may need to define it. Will it be
> possible to demonstrate your problem with an example? Like what's the input
> and output. Maybe with few columns..
> On 19 Sep 2016 20:36, "Abhishek Anand"  wrote:
>
>> Hi Ayan,
>>
>> How will I get column wise distinct items using this approach ?
>>
>> On Mon, Sep 19, 2016 at 3:31 PM, ayan guha  wrote:
>>
>>> Create an array out of cilumns, convert to Dataframe,
>>> explode,distinct,write.
>>> On 19 Sep 2016 19:11, "Saurav Sinha"  wrote:
>>>
 You can use distinct over you data frame or rdd

 rdd.distinct

 It will give you distinct across your row.

 On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand <
 abhis.anan...@gmail.com> wrote:

> I have an rdd which contains 14 different columns. I need to find the
> distinct across all the columns of rdd and write it to hdfs.
>
> How can I acheive this ?
>
> Is there any distributed data structure that I can use and keep on
> updating it as I traverse the new rows ?
>
> Regards,
> Abhi
>



 --
 Thanks and Regards,

 Saurav Sinha

 Contact: 9742879062

>>>
>>


Re: Finding unique across all columns in dataset

2016-09-19 Thread ayan guha
Hi

If you want column wise distinct, you may need to define it. Will it be
possible to demonstrate your problem with an example? Like what's the input
and output. Maybe with few columns..
On 19 Sep 2016 20:36, "Abhishek Anand"  wrote:

> Hi Ayan,
>
> How will I get column wise distinct items using this approach ?
>
> On Mon, Sep 19, 2016 at 3:31 PM, ayan guha  wrote:
>
>> Create an array out of cilumns, convert to Dataframe,
>> explode,distinct,write.
>> On 19 Sep 2016 19:11, "Saurav Sinha"  wrote:
>>
>>> You can use distinct over you data frame or rdd
>>>
>>> rdd.distinct
>>>
>>> It will give you distinct across your row.
>>>
>>> On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand >> > wrote:
>>>
 I have an rdd which contains 14 different columns. I need to find the
 distinct across all the columns of rdd and write it to hdfs.

 How can I acheive this ?

 Is there any distributed data structure that I can use and keep on
 updating it as I traverse the new rows ?

 Regards,
 Abhi

>>>
>>>
>>>
>>> --
>>> Thanks and Regards,
>>>
>>> Saurav Sinha
>>>
>>> Contact: 9742879062
>>>
>>
>


Fwd: Write.df is failing on NFS and S3 based spark cluster

2016-09-19 Thread Sankar Mittapally
Hi ,

 We have setup a spark cluster which is on NFS shared storage, there is no
permission issues with NFS storage, all the users are able to write to NFS
storage. When I fired write.df command in SparkR, I am getting below. Can
some one please help me to fix this issue.


16/09/17 08:03:28 ERROR InsertIntoHadoopFsRelationCommand: Aborting job.
java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus
{path=file:/nfspartition/sankar/banking_l1_v2.csv/_
temporary/0/task_201609170802_0013_m_00/part-r-0-
46a7f178-2490-444e-9110-510978eaaecb.csv; isDirectory=false;
length=436486316; replication=1; blocksize=33554432;
modification_time=147409940;
access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false}

to file:/nfspartition/sankar/banking_l1_v2.csv/part-r-
0-46a7f178-2490-444e-9110-510978eaaecb.csv
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
FileOutputCommitter.java:371)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
FileOutputCommitter.java:384)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(
FileOutputCommitter.java:326)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(
WriterContainer.scala:222)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
and$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
and$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
and$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationComm
and.run(InsertIntoHadoopFsRelationCommand.scala:115)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.
sideEffectResult$lzycompute(commands.scala:60)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.
sideEffectResult(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(
commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$
execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$
execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(
SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:
133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(
QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.
toRdd(QueryExecution.scala:86)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.
scala:487)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.api.r.RBackendHandler.handleMethodCall(
RBackendHandler.scala:141)
at org.apache.spark.api.r.RBackendHandler.channelRead0(
RBackendHandler.scala:86)
at org.apache.spark.api.r.RBackendHandler.channelRead0(
RBackendHandler.scala:38)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(
SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(
MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(
ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(
AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(
AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(
DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(
NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
NioEventLoop.java:468)
at io.netty.chann

spark streaming slow checkpointing when calling Rserve

2016-09-19 Thread Piubelli, Manuel


Hello,



I wrote a spark streaming application in Java. It reads stock trades off of a 
data feed receiver and converts them to Tick objects, and uses a microbatch 
interval, window interval and sliding interval of 10 seconds. A 
JavaPairDStream> is created where the key is the stock 
symbol.

The Tick objects are then stored in a JavaMapWithStateDStream using 
mapWithState; analytics calculations are performed in the mapWithState callback 
function using the Ticks as input. Everything works fine until I modified my 
program to also call Rserve inside the mapWithState callback function in order 
to perform additional analytics calculations in R.

When I started calling Rserve, every 10th window would take a long time to 
process; this is the window that also writes to the checkpoint file (I am using 
Hadoop). Every 10th window takes longer to process than the previous 10th 
window (window 30 takes longer than window 20 which takes longer than window 
10). All of the non-checkpoint windows finish well within 10 seconds, but the 
checkpoint windows can eventually take minutes to complete, and the other 
windows queue behind them.

I then tried to set the checkpoint interval on the JavaMapWithStateDStream to 
24 hours in order to effectively disable checkpointing 
(mapWithStateStream.checkpoint(Durations.minutes(1440))). I enabled the workers 
on the 3 server cluster with enough memory so that they would survive the 
growing memory usage that would result.

The results that I outputted to the log were unexpected. Previously the 
JavaPairDStream> was being populated with 5000 keys, and 
it still was. But, previously 5000 keys were being passed to the mapWithState 
callback function; now only 200 keys were being passed to it, and I see many 
stages skipped in the Spark Streaming UI web page. When I run this in single 
process mode on my MS Windows machine, 5000 keys are still passed to the 
mapWithState callback function.

Does anyone have any idea of why calling Rserve would cause such a huge 
increase in checkpointing time, or why calling 
checkpoint(Durations.minutes(1440)) on the JavaMapWithStateDStream would cause 
spark to not pass most of the tuples in the JavaPairDStream> to the mapWithState callback function?



Question is also posted on 
http://stackoverflow.com/questions/39535804/spark-streaming-slow-checkpointing-when-calling-rserve.



Thanks



Re: Spark to HBase Fast Bulk Upload

2016-09-19 Thread Kabeer Ahmed
Hi,

Without using Spark there are a couple of options. You can refer to the link: 
http://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/.

The gist is that you convert the data into HFiles and use the bulk upload 
option to get the data quickly into HBase.

HTH
Kabeer.

On Mon, 19 Sep, 2016 at 12:59 PM, Punit Naik  wrote:
Hi Guys

I have a huge dataset (~ 1TB) which has about a billion records. I have to 
transfer it to an HBase table. What is the fastest way of doing it?

--
Thank You

Regards

Punit Naik




Spark to HBase Fast Bulk Upload

2016-09-19 Thread Punit Naik
Hi Guys

I have a huge dataset (~ 1TB) which has about a billion records. I have to
transfer it to an HBase table. What is the fastest way of doing it?

-- 
Thank You

Regards

Punit Naik


cassandra.yaml configuration for cassandra spark connection

2016-09-19 Thread muhammet pakyürek
how to configure cassandra.yaml configuration file for datastax

cassandra spark connection




Re: Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
Hi Ayan,

How will I get column wise distinct items using this approach ?

On Mon, Sep 19, 2016 at 3:31 PM, ayan guha  wrote:

> Create an array out of cilumns, convert to Dataframe,
> explode,distinct,write.
> On 19 Sep 2016 19:11, "Saurav Sinha"  wrote:
>
>> You can use distinct over you data frame or rdd
>>
>> rdd.distinct
>>
>> It will give you distinct across your row.
>>
>> On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand 
>> wrote:
>>
>>> I have an rdd which contains 14 different columns. I need to find the
>>> distinct across all the columns of rdd and write it to hdfs.
>>>
>>> How can I acheive this ?
>>>
>>> Is there any distributed data structure that I can use and keep on
>>> updating it as I traverse the new rows ?
>>>
>>> Regards,
>>> Abhi
>>>
>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>


Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-19 Thread Jacek Laskowski
Hi Janardhan,

What's the command to build the project (sbt package or sbt assembly)?
What's the command you execute to run the application?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Sep 19, 2016 at 2:12 AM, janardhan shetty
 wrote:
> Hi Sujit,
>
> Tried that option but same error:
>
> java version "1.8.0_51"
>
>
> libraryDependencies ++= {
>   val sparkVersion = "2.0.0"
>   Seq(
> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "com.google.protobuf" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>   )
> }
>
> Error:
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP
> at
> transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.apply(Lemmatizer.scala:37)
> at
> transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.apply(Lemmatizer.scala:33)
> at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:88)
> at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2.apply(ScalaUDF.scala:87)
> at
> org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1060)
> at
> org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:142)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:45)
> at
> org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:29)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>
>
> On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal  wrote:
>>
>> Hi Janardhan,
>>
>> Maybe try removing the string "test" from this line in your build.sbt?
>> IIRC, this restricts the models JAR to be called from a test.
>>
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
>> "models",
>>
>> -sujit
>>
>>
>> On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty
>>  wrote:
>>>
>>> Hi,
>>>
>>> I am trying to use lemmatization as a transformer and added belwo to the
>>> build.sbt
>>>
>>>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test" classifier
>>> "models",
>>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>>
>>>
>>> Error:
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> edu/stanford/nlp/pipeline/StanfordCoreNLP
>>>
>>> I have tried other versions of this spark package.
>>>
>>> Any help is appreciated..
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Total Shuffle Read and Write Size of Spark workload

2016-09-19 Thread Jacek Laskowski
On Mon, Sep 19, 2016 at 11:36 AM, Mich Talebzadeh
 wrote:
> Spark UI on port 4040 by default

That's exactly *a* SparkListener + web UI :)

Jacek

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Total Shuffle Read and Write Size of Spark workload

2016-09-19 Thread Jacek Laskowski
Hi Cristina,

http://blog.jaceklaskowski.pl/spark-workshop/slides/08_Monitoring_using_SparkListeners.html

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.scheduler.SparkListener

Let me know if you've got more questions.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Mon, Sep 19, 2016 at 11:34 AM, Cristina Rozee
 wrote:
> Could you please explain a little bit?
>
>
> On Sun, Sep 18, 2016 at 10:19 PM, Jacek Laskowski  wrote:
>>
>> SparkListener perhaps?
>>
>> Jacek
>>
>>
>> On 15 Sep 2016 1:41 p.m., "Cristina Rozee" 
>> wrote:
>>>
>>> Hello,
>>>
>>> I am running a spark application and I would like to know the total
>>> amount of shuffle data (read + write ) so could anyone let me know how to
>>> get this information?
>>>
>>> Thank you
>>> Cristina.
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Finding unique across all columns in dataset

2016-09-19 Thread ayan guha
Create an array out of cilumns, convert to Dataframe,
explode,distinct,write.
On 19 Sep 2016 19:11, "Saurav Sinha"  wrote:

> You can use distinct over you data frame or rdd
>
> rdd.distinct
>
> It will give you distinct across your row.
>
> On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand 
> wrote:
>
>> I have an rdd which contains 14 different columns. I need to find the
>> distinct across all the columns of rdd and write it to hdfs.
>>
>> How can I acheive this ?
>>
>> Is there any distributed data structure that I can use and keep on
>> updating it as I traverse the new rows ?
>>
>> Regards,
>> Abhi
>>
>
>
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


Re: Total Shuffle Read and Write Size of Spark workload

2016-09-19 Thread Cristina Rozee
I Mich,

I do not have access to UI as I am running jobs on remote system and I can
access it using putty only so only console or logs files are available to
me.

Thanks

On Mon, Sep 19, 2016 at 11:36 AM, Mich Talebzadeh  wrote:

> Spark UI on port 4040 by default
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 19 September 2016 at 10:34, Cristina Rozee 
> wrote:
>
>> Could you please explain a little bit?
>>
>>
>> On Sun, Sep 18, 2016 at 10:19 PM, Jacek Laskowski 
>> wrote:
>>
>>> SparkListener perhaps?
>>>
>>> Jacek
>>>
>>> On 15 Sep 2016 1:41 p.m., "Cristina Rozee" 
>>> wrote:
>>>
 Hello,

 I am running a spark application and I would like to know the total
 amount of shuffle data (read + write ) so could anyone let me know how to
 get this information?

 Thank you
 Cristina.

>>>
>>
>


Re: Total Shuffle Read and Write Size of Spark workload

2016-09-19 Thread Mich Talebzadeh
Spark UI on port 4040 by default

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 19 September 2016 at 10:34, Cristina Rozee 
wrote:

> Could you please explain a little bit?
>
>
> On Sun, Sep 18, 2016 at 10:19 PM, Jacek Laskowski  wrote:
>
>> SparkListener perhaps?
>>
>> Jacek
>>
>> On 15 Sep 2016 1:41 p.m., "Cristina Rozee" 
>> wrote:
>>
>>> Hello,
>>>
>>> I am running a spark application and I would like to know the total
>>> amount of shuffle data (read + write ) so could anyone let me know how to
>>> get this information?
>>>
>>> Thank you
>>> Cristina.
>>>
>>
>


Re: Total Shuffle Read and Write Size of Spark workload

2016-09-19 Thread Cristina Rozee
Could you please explain a little bit?

On Sun, Sep 18, 2016 at 10:19 PM, Jacek Laskowski  wrote:

> SparkListener perhaps?
>
> Jacek
>
> On 15 Sep 2016 1:41 p.m., "Cristina Rozee" 
> wrote:
>
>> Hello,
>>
>> I am running a spark application and I would like to know the total
>> amount of shuffle data (read + write ) so could anyone let me know how to
>> get this information?
>>
>> Thank you
>> Cristina.
>>
>


Re: 1TB shuffle failed with executor lost failure

2016-09-19 Thread Divya Gehlot
The exit code 52 comes from org.apache.spark.util.SparkExitCode, and it is
val OOM=52 - i.e. an OutOfMemoryError
Refer
https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala



On 19 September 2016 at 14:57, Cyanny LIANG  wrote:

> My job is 1TB join + 10 GB table on spark1.6.1
> run on yarn mode:
>
> *1. if I open shuffle service, the error is *
> Job aborted due to stage failure: ShuffleMapStage 2 (writeToDirectory at
> NativeMethodAccessorImpl.java:-2) has failed the maximum allowable number
> of times: 4. Most recent failure reason: 
> org.apache.spark.shuffle.FetchFailedException:
> java.lang.RuntimeException: Executor is not registered 
> (appId=application_1473819702737_1239,
> execId=52)
> at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.
> getBlockData(ExternalShuffleBlockResolver.java:105)
> at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.
> receive(ExternalShuffleBlockHandler.java:74)
> at org.apache.spark.network.server.TransportRequestHandler.
> processRpcRequest(TransportRequestHandler.java:114)
> at org.apache.spark.network.server.TransportRequestHandler.handle(
> TransportRequestHandler.java:87)
> at org.apache.spark.network.server.TransportChannelHandler.
> channelRead0(TransportChannelHandler.java:101)
>
> *2. if I close shuffle service, *
> *set spark.executor.instances 80*
> the error is :
> ExecutorLostFailure (executor 71 exited caused by one of the running
> tasks) Reason: Container marked as failed: 
> container_1473819702737_1432_01_406847560
> on host: nmg01-spark-a0021.nmg01.baidu.com. Exit status: 52. Diagnostics:
> Exception from container-launch: ExitCodeException exitCode=52:
> ExitCodeException exitCode=52:
>
> These errors are reported on shuffle stage
> My data is skew, some ids have 400million rows, but some ids only have
> 1million rows, is anybody has some ideas to solve the problem?
>
>
> *3. My config is *
> Here is my config
> I use tungsten-sort in off-heap mode, in on-heap mode, the oom problem
> will be more serious
>
> spark.driver.cores 4
>
> spark.driver.memory 8g
>
>
> # use on client mode
>
>
> spark.yarn.am.memory 8g
>
>
> spark.yarn.am.cores 4
>
>
> spark.executor.memory 8g
>
>
> spark.executor.cores 4
>
> spark.yarn.executor.memoryOverhead 6144
>
>
> spark.memory.offHeap.enabled true
>
>
> spark.memory.offHeap.size 40
>
> Best & Regards
> Cyanny LIANG
>


Re: Finding unique across all columns in dataset

2016-09-19 Thread Saurav Sinha
You can use distinct over you data frame or rdd

rdd.distinct

It will give you distinct across your row.

On Mon, Sep 19, 2016 at 2:35 PM, Abhishek Anand 
wrote:

> I have an rdd which contains 14 different columns. I need to find the
> distinct across all the columns of rdd and write it to hdfs.
>
> How can I acheive this ?
>
> Is there any distributed data structure that I can use and keep on
> updating it as I traverse the new rows ?
>
> Regards,
> Abhi
>



-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Finding unique across all columns in dataset

2016-09-19 Thread Abhishek Anand
I have an rdd which contains 14 different columns. I need to find the
distinct across all the columns of rdd and write it to hdfs.

How can I acheive this ?

Is there any distributed data structure that I can use and keep on updating
it as I traverse the new rows ?

Regards,
Abhi


Re: Dataframe, Java: How to convert String to Vector ?

2016-09-19 Thread Yan Facai
Hi, all.
I find that it's really confuse.

I can use Vectors.parse to create a DataFrame contains Vector type.

scala> val dataVec = Seq((0, Vectors.parse("[1,3,5]")), (1,
Vectors.parse("[2,4,6]"))).toDF
dataVec: org.apache.spark.sql.DataFrame = [_1: int, _2: vector]


But using map to convert String to Vector throws an error:

scala> val dataStr = Seq((0, "[1,3,5]"), (1, "[2,4,6]")).toDF
dataStr: org.apache.spark.sql.DataFrame = [_1: int, _2: string]

scala> dataStr.map(row => Vectors.parse(row.getString(1)))
:30: error: Unable to find encoder for type stored in a
Dataset.  Primitive types (Int, String, etc) and Product types (case
classes) are supported by importing spark.implicits._  Support for
serializing other types will be added in future releases.
  dataStr.map(row => Vectors.parse(row.getString(1)))


Dose anyone can help me,
thanks very much!







On Tue, Sep 6, 2016 at 9:58 PM, Peter Figliozzi 
wrote:

> Hi Yan, I think you'll have to map the features column to a new numerical
> features column.
>
> Here's one way to do the individual transform:
>
> scala> val x = "[1, 2, 3, 4, 5]"
> x: String = [1, 2, 3, 4, 5]
>
> scala> val y:Array[Int] = x slice(1, x.length - 1) replace(",", "")
> split(" ") map(_.toInt)
> y: Array[Int] = Array(1, 2, 3, 4, 5)
>
> If you don't know about the Scala command line, just type "scala" in a
> terminal window.  It's a good place to try things out.
>
> You can make a function out of this transformation and apply it to your
> features column to make a new column.  Then add this with
> Dataset.withColumn.
>
> See here
> 
> on how to apply a function to a Column to make a new column.
>
> On Tue, Sep 6, 2016 at 1:56 AM, 颜发才(Yan Facai)  wrote:
>
>> Hi,
>> I have a csv file like:
>> uid  mid  features   label
>> 1235231[0, 1, 3, ...]True
>>
>> Both  "features" and "label" columns are used for GBTClassifier.
>>
>> However, when I read the file:
>> Dataset samples = sparkSession.read().csv(file);
>> The type of samples.select("features") is String.
>>
>> My question is:
>> How to map samples.select("features") to Vector or any appropriate type,
>> so I can use it to train like:
>> GBTClassifier gbdt = new GBTClassifier()
>> .setLabelCol("label")
>> .setFeaturesCol("features")
>> .setMaxIter(2)
>> .setMaxDepth(7);
>>
>> Thanks.
>>
>
>


best versions for cassandra spark connection

2016-09-19 Thread muhammet pakyürek
hi


in order to connect pyspark to cassandra which versions of items for conection 
must be installed. i think cassandra 3.7 is not compatible with spark 2.0 and 
datastax pyspark-cassandra connector 2.0, please give me the correct version 
and steps to connect them




Re: filling missing values in a sequence

2016-09-19 Thread Sudhindra Magadi
thanks ayan

On Mon, Sep 19, 2016 at 12:25 PM, ayan guha  wrote:

> Let me give you a possible direction, please do not use as it is :)
>
> >>> r = sc.parallelize([1,3,4,6,8,11,12,5],3)
>
> here, I am loading some numbers and partitioning. This partitioning is
> critical. You may just use partitioning scheme comes with Spark (like
> above) or, use your own through partitionBykey. This should have 2
> criteria:
>
> a) Even distribution and Each partition should be small enough to be held
> in memory
> b) Partition boundaries are continuous.
>
> Now, let us write a function which operates on an iterator and do
> something (here, it only concats, but you can use it to sort, loop through
> and emit missing ones)
>
> >>>
> >>> def f(iterator):
> ... yield ",".join(map(str,iterator))
>
> Now, you can use RDD operation to run this function on each partition:
>
> >>> r1 = r.mapPartitions(f)
>
> Now, you would have local missing values. You can now write them out to a
> file.
>
> On Mon, Sep 19, 2016 at 4:39 PM, Sudhindra Magadi 
> wrote:
>
>> that is correct
>>
>> On Mon, Sep 19, 2016 at 12:09 PM, ayan guha  wrote:
>>
>>> Ok, so if you see
>>>
>>> 1,3,4,6.
>>>
>>> Will you say 2,5 are missing?
>>>
>>> On Mon, Sep 19, 2016 at 4:15 PM, Sudhindra Magadi 
>>> wrote:
>>>
 Each of the records will be having a sequence id .No duplicates

 On Mon, Sep 19, 2016 at 11:42 AM, ayan guha 
 wrote:

> And how do you define missing sequence? Can you give an example?
>
> On Mon, Sep 19, 2016 at 3:48 PM, Sudhindra Magadi 
> wrote:
>
>> Hi Jorn ,
>>  We have a file with billion records.We want to find if there any
>> missing sequences here .If so what are they ?
>> Thanks
>> Sudhindra
>>
>> On Mon, Sep 19, 2016 at 11:12 AM, Jörn Franke 
>> wrote:
>>
>>> I am not sure what you try to achieve here. Can you please tell us
>>> what the goal of the program is. Maybe with some example data?
>>>
>>> Besides this, I have the feeling that it will fail once it is not
>>> used in a single node scenario due to the reference to the global 
>>> counter
>>> variable.
>>>
>>> Also unclear why you collect the data first to parallelize it again.
>>>
>>> On 18 Sep 2016, at 14:26, sudhindra  wrote:
>>>
>>> Hi i have coded something like this , pls tell me how bad it is .
>>>
>>> package Spark.spark;
>>> import java.util.List;
>>> import java.util.function.Function;
>>>
>>> import org.apache.spark.SparkConf;
>>> import org.apache.spark.SparkContext;
>>> import org.apache.spark.api.java.JavaRDD;
>>> import org.apache.spark.api.java.JavaSparkContext;
>>> import org.apache.spark.sql.DataFrame;
>>> import org.apache.spark.sql.Dataset;
>>> import org.apache.spark.sql.Row;
>>> import org.apache.spark.sql.SQLContext;
>>>
>>>
>>>
>>> public class App
>>> {
>>>static long counter=1;
>>>public static void main( String[] args )
>>>{
>>>
>>>
>>>
>>>SparkConf conf = new
>>> SparkConf().setAppName("sorter").setMaster("local[2]").set("
>>> spark.executor.memory","1g");
>>>JavaSparkContext sc = new JavaSparkContext(conf);
>>>
>>>SQLContext sqlContext = new org.apache.spark.sql.SQLContex
>>> t(sc);
>>>
>>>DataFrame df = sqlContext.read().json("path");
>>>DataFrame sortedDF = df.sort("id");
>>>//df.show();
>>>//sortedDF.printSchema();
>>>
>>>System.out.println(sortedDF.collectAsList().toString());
>>>JavaRDD distData = sc.parallelize(sortedDF.collec
>>> tAsList());
>>>
>>>
>>> ListmissingNumbers=distData.map(new
>>> org.apache.spark.api.java.function.Function() {
>>>
>>>
>>>public String call(Row arg0) throws Exception {
>>>// TODO Auto-generated method stub
>>>
>>>
>>>if(counter!=new Integer(arg0.getString(0)).int
>>> Value())
>>>{
>>>StringBuffer misses = new StringBuffer();
>>>long newCounter=counter;
>>>while(newCounter!=new
>>> Integer(arg0.getString(0)).intValue())
>>>{
>>>misses.append(new String(new Integer((int)
>>> counter).toString()) );
>>>newCounter++;
>>>
>>>}
>>>counter=new Integer(arg0.getString(0)).int
>>> Value()+1;
>>>return misses.toString();
>>>
>>>}
>>>counter++;
>>>return null;
>>>
>>>
>>>
>>>}
>>>}).collect();
>>>
>>>
>>>
>>>for (String name: missingNumbers) {
>>>  System.out.println(

cassandra 3.7 is compatible with datastax Spark Cassandra Connector 2.0?

2016-09-19 Thread muhammet pakyürek





Re: Is RankingMetrics' NDCG implementation correct?

2016-09-19 Thread Sean Owen
Yes, relevance is always 1. The label is not a relevance score so
don't think it's valid to use it as such.

On Mon, Sep 19, 2016 at 4:42 AM, Jong Wook Kim  wrote:
> Hi,
>
> I'm trying to evaluate a recommendation model, and found that Spark and
> Rival give different results, and it seems that Rival's one is what Kaggle
> defines: https://gist.github.com/jongwook/5d4e78290eaef22cb69abbf68b52e597
>
> Am I using RankingMetrics in a wrong way, or is Spark's implementation
> incorrect?
>
> To my knowledge, NDCG should be dependent on the relevance (or preference)
> values, but Spark's implementation seems not; it uses 1.0 where it should be
> 2^(relevance) - 1, probably assuming that relevance is all 1.0? I also tried
> tweaking, but its method to obtain the ideal DCG also seems wrong.
>
> Any feedback from MLlib developers would be appreciated. I made a
> modified/extended version of RankingMetrics that produces the identical
> numbers to Kaggle and Rival's results, and I'm wondering if it is something
> appropriate to be added back to MLlib.
>
> Jong Wook

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org