Re: Problems with Local Checkpoints

2015-09-14 Thread Akhil Das
You need to set your HADOOP_HOME and make sure the winutils.exe is
available in the PATH.

Here's a discussion around the same issue
http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path
Also this JIRA https://issues.apache.org/jira/browse/SPARK-2356

Thanks
Best Regards

On Wed, Sep 9, 2015 at 11:30 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have some basic code that counts numbers using updateStateByKey.  I
> setup a streaming context with checkpointing as follows:
>
> def createStreamingContext(masterName : String, checkpointDirectory : String, 
> timeWindow : Int) : StreamingContext = {
>   val sparkConf = new SparkConf().setAppName("Program")
>   val ssc = new StreamingContext(sparkConf, Seconds(timeWindow))
>   ssc.checkpoint(checkpointDirectory)
>   ssc
> }
>
>
> This runs fine on my distributed (Linux) cluster, writing checkpoints to
> local disk. However, when I run on my Windows desktop I am seeing a number
> of checkpoint errors:
>
> 15/09/09 13:57:06 INFO CheckpointWriter: Saving checkpoint for time
> 1441821426000 ms to file
> 'file:/C:/Temp/sparkcheckpoint/checkpoint-1441821426000'
> Exception in thread "pool-14-thread-4" java.lang.NullPointerException
>  at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
>  at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
>  at org.apache.hadoop.util.Shell.run(Shell.java:379)
>  at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
>  at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
>  at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
>  at
> org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
>  at
> org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
>  at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
>  at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
>  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:772)
>  at
> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:181)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>
> JAVA_HOME is set correctly, the code runs correctly, it's not a
> permissions issue (I've run this as Administrator).  Directories and files
> are being created in C:\Temp, although all of the files appear to be empty.
>
> Does anyone have an idea of what is causing these errors?  Has anyone seen
> something similar?
>
> Regards,
>
> Bryan Jeffrey
>
>
>


Re: SparkR - Support for Other Models

2015-09-14 Thread Akhil Das
You can look into the Spark JIRA
 page for the same, if it
isn't available there then you could possibly create an issue for support
and hopefully in later releases it will be added.

Thanks
Best Regards

On Thu, Sep 10, 2015 at 11:26 AM, Manish MAHESHWARI  wrote:

> Hello,
>
>
>
> Is there a time line to add support for other model types like SVD,
> Cluster, GBM etc in the subsequent releases. 1.5 Added support for Linear
> models only.
>
> If there is, where can we know the tentative timeline of the same.
>
>
>
> Thanks,
>
> Manish
>
>
> CONFIDENTIAL NOTE: The information contained in this email is intended
> only for the use of the individual or entity named above and may contain
> information that is privileged, confidential and exempt from disclosure
> under applicable law. If the reader of this message is not the intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of this communication is strictly prohibited. If you have received
> this message in error, please immediately notify the sender and delete the
> mail. Thank you.
>


Re: Replacing Esper with Spark Streaming?

2015-09-14 Thread Todd Nist
Stratio offers a CEP implementation based on Spark Streaming and the Siddhi
CEP engine.  I have not used the below, but they may be of some value to
you:

http://stratio.github.io/streaming-cep-engine/

https://github.com/Stratio/streaming-cep-engine

HTH.

-Todd

On Sun, Sep 13, 2015 at 7:49 PM, Otis Gospodnetić <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> I'm wondering if anyone has attempted to replace Esper with Spark
> Streaming or if anyone thinks Spark Streaming is/isn't a good tool for the
> (CEP) job?
>
> We are considering Akka or Spark Streaming as possible Esper replacements
> and would appreciate any input from people who tried to do that with either
> of them.
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>


application failed on large dataset

2015-09-14 Thread 周千昊
Hi, community
  I am facing a strange problem:
  all executors does not respond, and then all of them failed with the
ExecutorLostFailure.
  when I look into yarn logs, there are full of such exception

15/09/14 04:35:33 ERROR shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 1 outstanding blocks (after 3 retries)
java.io.IOException: Failed to connect to host/ip:port
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:193)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:88)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: host/ip:port
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
... 1 more


  The strange thing is that, if I reduce the input size, the problems
just disappeared. I have found a similar issue in the mail-archive(
http://mail-archives.us.apache.org/mod_mbox/spark-user/201502.mbox/%3CCAOHP_tHRtuxDfWF0qmYDauPDhZ1=MAm5thdTfgAhXDN=7kq...@mail.gmail.com%3E),
however I didn't see the solution. So I am wondering if anyone could help
with that?

  My env is:
  hdp 2.2.6
  spark(1.4.1)
  mode: yarn-client
  spark-conf:
  spark.driver.extraJavaOptions -Dhdp.version=2.2.6.0-2800
  spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.6.0-2800
  spark.executor.memory 6g
  spark.storage.memoryFraction 0.3
  spark.dynamicAllocation.enabled true
  spark.shuffle.service.enabled true


RE: Problems with Local Checkpoints

2015-09-14 Thread Bryan
Akhil,

This looks like the issue. I'll update my path to include the (soon to be 
added) winutils & assoc. DLLs.

Thank you,

Bryan

-Original Message-
From: "Akhil Das" 
Sent: ‎9/‎14/‎2015 6:46 AM
To: "Bryan Jeffrey" 
Cc: "user" 
Subject: Re: Problems with Local Checkpoints

You need to set your HADOOP_HOME and make sure the winutils.exe is available in 
the PATH.


Here's a discussion around the same issue 
http://stackoverflow.com/questions/19620642/failed-to-locate-the-winutils-binary-in-the-hadoop-binary-path
 Also this JIRA https://issues.apache.org/jira/browse/SPARK-2356


Thanks
Best Regards


On Wed, Sep 9, 2015 at 11:30 PM, Bryan Jeffrey  wrote:

Hello.


I have some basic code that counts numbers using updateStateByKey.  I setup a 
streaming context with checkpointing as follows:


def createStreamingContext(masterName : String, checkpointDirectory : String, 
timeWindow : Int) : StreamingContext = {  val sparkConf = new 
SparkConf().setAppName("Program")  val ssc = new StreamingContext(sparkConf, 
Seconds(timeWindow))  ssc.checkpoint(checkpointDirectory)  ssc}

This runs fine on my distributed (Linux) cluster, writing checkpoints to local 
disk. However, when I run on my Windows desktop I am seeing a number of 
checkpoint errors:


15/09/09 13:57:06 INFO CheckpointWriter: Saving checkpoint for time 
1441821426000 ms to file 
'file:/C:/Temp/sparkcheckpoint/checkpoint-1441821426000'
Exception in thread "pool-14-thread-4" java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
 at org.apache.hadoop.util.Shell.run(Shell.java:379)
 at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
 at 
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
 at 
org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
 at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
 at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:772)
 at 
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:181)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)


JAVA_HOME is set correctly, the code runs correctly, it's not a permissions 
issue (I've run this as Administrator).  Directories and files are being 
created in C:\Temp, although all of the files appear to be empty.


Does anyone have an idea of what is causing these errors?  Has anyone seen 
something similar?


Regards,


Bryan Jeffrey

Re: java.lang.NullPointerException with Twitter API

2015-09-14 Thread Akhil Das
Some status might not have the geoLocation and hence you are doing a
null.toString.contains which ends up in that exception, put a condition or
try...catch around it to make it work.

Thanks
Best Regards

On Fri, Sep 11, 2015 at 12:59 AM, Jo Sunad  wrote:

> Hello!
>
> I am trying to customize the Twitter Example TD did by only printing
> messages that have a GeoLocation.
>
> I am getting a NullPointerException:
>
> java.lang.NullPointerException
> at Twitter$$anonfun$1.apply(Twitter.scala:64)
> at Twitter$$anonfun$1.apply(Twitter.scala:64)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
> at
> com.datastax.spark.connector.writer.GroupingBatchBuilder.hasNext(GroupingBatchBuilder.scala:101)
>
>
> Twitter.scala:64 is this line of code:
> //every GeoLocation should contain a ","
>
> val filtering = stream.filter(status =>
> status.getGeoLocation().toString().contains(","))
>
> Relevant code:
>
> //every GeoLocation should contain a ","
>  val filtering = stream.filter(status =>
> status.getGeoLocation().toString().contains(","))
>
> //this code works if I do stream.map
> val hashTags = filtering.map(status =>
> TweetC(classifyTweet(status.getText()), status.getGeoLocation(),
> status.getUser().getFollowersCount(),status.getText())).saveToCassandra("demo",
> "twitter")
>
>  I'm thinking this might be due to the free public Twitter API not letting
> me get access to GeoTagged tweets so val hashtags is always null and hence
> the NullPointerException. Has anyone else used the free API and seen
> GeoLocations?
>
>
>


Spark Streaming Topology

2015-09-14 Thread defstat
Hi all, 

I would like to use Spark Streaming for managing the problem below: 

I have 2 InputStreams, one for one type of input (n-dimensional vectors) and
one for question on the infrastructure (explained below). 

I need to "break" the input first in 4 execution nodes, and produce a stream
from each (4 streams) containing only m vectors that share common attributes
(what is "common" comes from comparing incoming vectors with vectors already
inside the "selected" group). If the selected group of m vectors changes by
the addition of an new vector, I need to forward the selected group to an
"aggregation node" that joins 2 execution nodes' selected groups (we have 2
such aggregation nodes). Following that, if an aggregation node has a change
in its joined selected group of vectors, it forwards its joined selected
vectors to an other aggregate node that contains the overall aggregation of
the 2 aggregation nodes (JoinAll). 

Now, if a "question" arrives, a mapToPair and then Sort transformations need
to be done on JoinAll, and print the result (one result for each question) 

Can anyone help me on this endeavour? 

I think, and correct me if I am mistaken, the architecture described needs
to: 

1. Partition Input Stream(1) to through  DStream eachPartition =
inputVectors.repartition(4) 
2. Filter() eachPartition like DStream eachPartitionFiltered =
eachPartition.filter(func) -> How can I use already persisted to node data
for that [I mean the already "selected group" of that specific partition] 
3. Group 2 out of 4 partitions to another DStream if needed and store it
inside another node. [???] 
4. if a "question" arrives, use the JoinAll DStream for answering the
question. [???] 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Topology-tp24686.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Spark job failed

2015-09-14 Thread Renu Yadav
-- Forwarded message --
From: Renu Yadav 
Date: Mon, Sep 14, 2015 at 4:51 PM
Subject: Spark job failed
To: d...@spark.apache.org


I am getting below error while running spark job:

storage.DiskBlockObjectWriter: Uncaught exception while reverting partial
writes to file
/data/vol5/hadoop/yarn/local/usercache/renu_yadav/appcache/application_1438196554863_31545/spark-4686a622-82be-418e-a8b0-1653458bc8cb/22/temp_shuffle_8c437ba7-55d2-4520-80ec-adcfe932b3bd
java.io.FileNotFoundException:
/data/vol5/hadoop/yarn/local/usercache/renu_yadav/appcache/application_1438196554863_31545/spark-4686a622-82be-418e-a8b0-1653458bc8cb/22/temp_shuffle_8c437ba7-55d2-4520-80ec-adcfe932b3bd
(No such file or directory



I am running 1.3TB data
following are the transformation

read from hadoop->map(key/value).coalease(2000).groupByKey.
then sorting each record by server_ts and select most recent

saving data into parquet.


Following is the command
spark-submit --class com.test.Myapp--master yarn-cluster  --driver-memory
16g  --executor-memory 20g --executor-cores 5   --num-executors 150
--files /home/renu_yadav/fmyapp/hive-site.xml --conf
spark.yarn.preserve.staging.files=true --conf
spark.shuffle.memoryFraction=0.6  --conf spark.storage.memoryFraction=0.1
--conf SPARK_SUBMIT_OPTS="-XX:MaxPermSize=768m"  --conf
SPARK_SUBMIT_OPTS="-XX:MaxPermSize=768m"   --conf
spark.akka.timeout=40  --conf spark.locality.wait=10 --conf
spark.yarn.executor.memoryOverhead=8000   --conf
SPARK_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
--conf spark.reducer.maxMbInFlight=96  --conf
spark.shuffle.file.buffer.kb=64 --conf
spark.core.connection.ack.wait.timeout=120  --jars
/usr/hdp/2.2.6.0-2800/hive/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/2.2.6.0-2800/hive/lib/datanucleus-core-3.2.10.jar,/usr/hdp/2.2.6.0-2800/hive/lib/datanucleus-rdbms-3.2.9.jar
myapp_2.10-1.0.jar







Cluster configuration

20 Nodes
32 cores per node
125 GB ram per node


Please Help.

Thanks & Regards,
Renu Yadav


Re: connecting to remote spark and reading files on HDFS or s3 in sparkR

2015-09-14 Thread Akhil Das
You can look into this doc
 regarding the
connection (its for gce though but it should be similar).

Thanks
Best Regards

On Thu, Sep 10, 2015 at 11:20 PM, roni  wrote:

> I have spark installed on a EC2 cluster. Can I connect to that from my
> local sparkR in RStudio? if yes , how ?
>
> Can I read files  which I have saved as parquet files on hdfs  or s3 in
> sparkR ? If yes , How?
>
> Thanks
> -Roni
>
>


Re: Implement "LIKE" in SparkSQL

2015-09-14 Thread Jorge Sánchez
I think after you get your table as a DataFrame, you can do a filter over
it, something like:

val t = sqlContext.sql("select * from table t")
val df = t.filter(t("a").contains(t("b")))

Let us know the results.

2015-09-12 10:45 GMT+01:00 liam :

>
> OK, I got another way, it looks silly and low inefficiency but works.
>
> tradeDF.registerTempTable(tradeTab);
>
> orderDF.registerTempTable(orderTab);
>
> //orderId = tid + "_x"
>
> String sql1 = "select * from " + tradeTab + " a, " + orderTab + " b where
> substr(b.orderId,1,15) = substr(a.tid,1) ";
>
> String sql2 = "select * from " + tradeTab + " a, " + orderTab + " b where
> substr(b.orderId,1,16) = substr(a.tid,1) ";
>
> String sql3 = "select * from " + tradeTab + " a, " + orderTab + " b where
> substr(b.orderId,1,17) = substr(a.tid,1) ";
>
> DataFrame combinDF =
> sqlContext.sql(sql1).unionAll(sqlContext.sql(sql2)).unionAll(sqlContext.sql(sql3));
>
>
>  As I try :
>substr(b.orderId,1,length(a.tid)) = a.tid  *-> no length available*
>b.orderId like concat(a.tid,'%')   *-> no concat available*
>instr(b.orderId,a.tid) > 0*->** no instr available*
>locate(a.tid,b.orderId) > 0 *->** no locate available*
>..*-> no
> .. *
>
>
>
> 2015-09-12 13:49 GMT+08:00 Richard Eggert :
>
>> concat and locate are available as of version 1.5.0, according to the
>> Scaladocs. For earlier versions of Spark, and for the operations that are
>> still not supported,  it's pretty straightforward to define your own
>> UserDefinedFunctions in either Scala or Java  (I don't know about other
>> languages).
>> On Sep 11, 2015 10:26 PM, "liam"  wrote:
>>
>>> Hi,
>>>
>>>  Imaging this: the value of one column is the substring of another
>>> column, when using Oracle,I got many ways to do the query like the
>>> following statement,but how to do in SparkSQL since this no "concat(),
>>> instr(), locate()..."
>>>
>>>
>>> select * from table t where t.a like '%'||t.b||'%';
>>>
>>>
>>> Thanks.
>>>
>>>
>


Re: Spark task hangs infinitely when accessing S3

2015-09-14 Thread Akhil Das
Are you sitting behind a proxy or something? Can you look more into the
executor logs? I have a strange feeling that you are blowing the memory
(and possibly hitting GC etc).

Thanks
Best Regards

On Thu, Sep 10, 2015 at 10:05 PM, Mario Pastorelli <
mario.pastore...@teralytics.ch> wrote:

> Dear community,
> I am facing a problem accessing data on S3 via Spark. My current
> configuration is the following:
>
> - Spark 1.4.1
> - Hadoop 2.7.1
> - hadoop-aws-2.7.1
> - mesos 0.22.1
>
> I am accessing the data using the s3a protocol but it just hangs. The job
> runs through the whole data set but
> systematically there is one tasks never finishing. In the stderr I am
> reading
> quite some timeout errors but it looks like the application is recovering
> from these. It is just infinitely running without proceeding to the next
> stage.
>
> This is the stacktrace I am reading from the errors that the job is
> recovering from:
>
> java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInpu
> tStream.read(SocketInputStream.java:152)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
> at
> sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
> at
> sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> at
> org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
> at
> org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
> at
> org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
> at java.io.FilterInputStream.read(FilterInputStream.java:133)
> at java.io.FilterInputStream.read(FilterInputStream.java:133)
> at java.io.FilterInputStream.read(FilterInputStream.java:133)
> at
> com.amazonaws.util.ContentLengthValidationInputStream.read(ContentLengthValidationInputStream.java:77)
> at java.io.FilterInputStream.read(FilterInputStream.java:133)
> at
> org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:164)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.readAByte(CBZip2InputStream.java:195)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:949)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:506)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:335)
> at
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:425)
> at
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:485)
> at java.io.InputStream.read(InputStream.java:101)
> at
> org.apache.hadoop.mapreduce.lib.input.CompressedSplitLineReader.fillBuffer(CompressedSplitLineReader.java:130)
> at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>
> My gut feeling is that the job is "failing at failing". It looks like some
> tasks that should be failing, unfortunately are not. This seems to not
> happen and, thus, the job just hangs forever. Moreover, debugging this
> problem is really hard because there is no concrete error in the logs.
>
> Could you help me figuring out what is happening and trying to find a
> solution to this issue?
> Thank you!
>


Twitter Streming using Twitter Public Streaming API and Apache Spark

2015-09-14 Thread Sadaf
Hi,

I wanna fetch PUBLIC tweets (not particular to any account) containing any
particular HASHTAG (#) (i.e "CocaCola" in my case) from twitter. I made an
APP on twitter to get the credentials, and then used Twitter Public
Streaming API.

Below is the piece of code.

{   val config = new twitter4j.conf.ConfigurationBuilder()
.setOAuthConsumerKey("***")
   .setOAuthConsumerSecret("***")
.setOAuthAccessToken("***")
.setOAuthAccessTokenSecret("***")
.build
val twitter_auth = new TwitterFactory(config)
val a = new twitter4j.auth.OAuthAuthorization(config)
val atwitter : Option[twitter4j.auth.Authorization] = 
Some(twitter_auth.getInstance(a).getAuthorization())

val sparkConf = new
SparkConf().setAppName("TwitterPublicStreaming").setMaster("local") 
  val ssc = new StreamingContext(sparkConf, Seconds(1))
 var filters: Seq[String]= "#CocaCola" ::Nil 
 val stream = TwitterUtils.createStream(ssc, atwitter,filters,
StorageLevel.MEMORY_AND_DISK_2)
 val data=stream.window(Seconds(1),Seconds(1))
 data.print()
 ssc.start()
 ssc.awaitTermination()
}

But most of the times it doesn't fetch tweets. it shows the Empty RDD as the
output.

Is there anything wrong? Can anyone points out the mistake? 
Thanks in Anticipation.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Twitter-Streming-using-Twitter-Public-Streaming-API-and-Apache-Spark-tp24687.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



hdfs-ha on mesos - odd bug

2015-09-14 Thread Adrian Bridgett
I'm hitting an odd issue with running spark on mesos together with 
HA-HDFS, with an even odder workaround.


In particular I get an error that it can't find the HDFS nameservice 
unless I put in a _broken_ url (discovered that workaround by 
mistake!).  core-site.xml, hdfs-site.xml is distributed to the slave 
node - and that file is read since I deliberately break the file then I 
get an error as you'd expect.


NB: This is a bit different to 
http://mail-archives.us.apache.org/mod_mbox/spark-user/201402.mbox/%3c1392442185079-1549.p...@n3.nabble.com%3E



Spark 1.5.0:

t=sc.textFile("hdfs://nameservice1/tmp/issue")
t.count()
(fails)

t=sc.textFile("file://etc/passwd")
t.count()
(errors about bad url - should have an extra / of course)
t=sc.textFile("hdfs://nameservice1/tmp/issue")
t.count()
then it works!!!

I should say that using file:///etc/passwd or hdfs:///tmp/issue both 
fail as well.  Unless preceded by a broken url.I've tried setting 
spark.hadoop.cloneConf to true, no change.


Sample (broken) run:
15/09/14 13:00:14 DEBUG HadoopRDD: Creating new JobConf and caching it 
for later re-use
15/09/14 13:00:14 DEBUG : address: ip-10-1-200-165/10.1.200.165 
isLoopbackAddress: false, with host 10.1.200.165 ip-10-1-200-165
15/09/14 13:00:14 DEBUG BlockReaderLocal: 
dfs.client.use.legacy.blockreader.local = false
15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit = 
false
15/09/14 13:00:14 DEBUG BlockReaderLocal: 
dfs.client.domain.socket.data.traffic = false
15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path = 
/var/run/hdfs-sockets/dn
15/09/14 13:00:14 DEBUG HAUtil: No HA service delegation token found for 
logical URI hdfs://nameservice1
15/09/14 13:00:14 DEBUG BlockReaderLocal: 
dfs.client.use.legacy.blockreader.local = false
15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit = 
false
15/09/14 13:00:14 DEBUG BlockReaderLocal: 
dfs.client.domain.socket.data.traffic = false
15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path = 
/var/run/hdfs-sockets/dn

15/09/14 13:00:14 DEBUG RetryUtils: multipleLinearRandomRetry = null
15/09/14 13:00:14 DEBUG Server: rpcKind=RPC_PROTOCOL_BUFFER, 
rpcRequestWrapperClass=class 
org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper, 
rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@6245f50b
15/09/14 13:00:14 DEBUG Client: getting client out of cache: 
org.apache.hadoop.ipc.Client@267f0fd3
15/09/14 13:00:14 DEBUG NativeCodeLoader: Trying to load the 
custom-built native-hadoop library...

15/09/14 13:00:14 DEBUG NativeCodeLoader: Loaded the native-hadoop library
...
15/09/14 13:00:14 DEBUG Client: Connecting to 
mesos-1.example.com/10.1.200.165:8020
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to 
mesos-1.example.com/10.1.200.165:8020 from ubuntu: starting, having 
connections 1
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to 
mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #0
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to 
mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #0

15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getFileInfo took 36ms
15/09/14 13:00:14 DEBUG FileInputFormat: Time taken to get FileStatuses: 69
15/09/14 13:00:14 INFO FileInputFormat: Total input paths to process : 1
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to 
mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #1
15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to 
mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #1

15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 1ms
15/09/14 13:00:14 DEBUG FileInputFormat: Total # of splits generated by 
getSplits: 2, TimeTaken: 104

...
15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to 
mesos-1.example.com/10.1.200.165:8020 from ubuntu: closed
15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to 
mesos-1.example.com/10.1.200.165:8020 from ubuntu: stopped, remaining 
connections 0
15/09/14 13:00:24 DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received 
message 
AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true) 
from Actor[akka://sparkDriver/temp/$g]
15/09/14 13:00:24 DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC 
message: 
AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true)
15/09/14 13:00:24 DEBUG 
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled 
message (0.513851 ms) 
AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true) 
from Actor[akka://sparkDriver/temp/$g]
15/09/14 13:00:25 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 
0, 10.1.200.245): java.lang.IllegalArgumentException: 
java.net.UnknownHostException: nameservice1
at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)

Re: Spark job failed

2015-09-14 Thread Ted Yu
Have you considered posting on vendor forum ?

FYI

On Mon, Sep 14, 2015 at 6:09 AM, Renu Yadav  wrote:

>
> -- Forwarded message --
> From: Renu Yadav 
> Date: Mon, Sep 14, 2015 at 4:51 PM
> Subject: Spark job failed
> To: d...@spark.apache.org
>
>
> I am getting below error while running spark job:
>
> storage.DiskBlockObjectWriter: Uncaught exception while reverting partial
> writes to file
> /data/vol5/hadoop/yarn/local/usercache/renu_yadav/appcache/application_1438196554863_31545/spark-4686a622-82be-418e-a8b0-1653458bc8cb/22/temp_shuffle_8c437ba7-55d2-4520-80ec-adcfe932b3bd
> java.io.FileNotFoundException:
> /data/vol5/hadoop/yarn/local/usercache/renu_yadav/appcache/application_1438196554863_31545/spark-4686a622-82be-418e-a8b0-1653458bc8cb/22/temp_shuffle_8c437ba7-55d2-4520-80ec-adcfe932b3bd
> (No such file or directory
>
>
>
> I am running 1.3TB data
> following are the transformation
>
> read from hadoop->map(key/value).coalease(2000).groupByKey.
> then sorting each record by server_ts and select most recent
>
> saving data into parquet.
>
>
> Following is the command
> spark-submit --class com.test.Myapp--master yarn-cluster  --driver-memory
> 16g  --executor-memory 20g --executor-cores 5   --num-executors 150
> --files /home/renu_yadav/fmyapp/hive-site.xml --conf
> spark.yarn.preserve.staging.files=true --conf
> spark.shuffle.memoryFraction=0.6  --conf spark.storage.memoryFraction=0.1
> --conf SPARK_SUBMIT_OPTS="-XX:MaxPermSize=768m"  --conf
> SPARK_SUBMIT_OPTS="-XX:MaxPermSize=768m"   --conf
> spark.akka.timeout=40  --conf spark.locality.wait=10 --conf
> spark.yarn.executor.memoryOverhead=8000   --conf
> SPARK_JAVA_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
> --conf spark.reducer.maxMbInFlight=96  --conf
> spark.shuffle.file.buffer.kb=64 --conf
> spark.core.connection.ack.wait.timeout=120  --jars
> /usr/hdp/2.2.6.0-2800/hive/lib/datanucleus-api-jdo-3.2.6.jar,/usr/hdp/2.2.6.0-2800/hive/lib/datanucleus-core-3.2.10.jar,/usr/hdp/2.2.6.0-2800/hive/lib/datanucleus-rdbms-3.2.9.jar
> myapp_2.10-1.0.jar
>
>
>
>
>
>
>
> Cluster configuration
>
> 20 Nodes
> 32 cores per node
> 125 GB ram per node
>
>
> Please Help.
>
> Thanks & Regards,
> Renu Yadav
>
>


Approach validation - building merged datasets - Spark SQL

2015-09-14 Thread Vajra L
Folks-
I am very new to Spark and Spark-SQL.

Here is what I am doing in my application.
Can you please validate and let me know if there is a better way?

1.  Parsing XML files with nested structures, ingested, into individual 
datasets 
Created a custom input format to split XML so each node becomes a record in my 
RDD.
Used Spark scala libraries to parse XML

2.  I am creating a partition key, for each record, and then using it to 
generate a key value pair RDD

3.  Saving distinct partition keys into an array, iterating through it and and 
creating Hive partition directories and issuing Hive add partition command 
“Alter table x add partition()..” - using Spark HiveContext

3.  Saving RDDs leveraging MultipleOutputs - so each file generated has a 
custom filename that includes partition key

4.  Programmatically moving files generated to Hive partitions leveraging the 
custom filenames

5.  Build a merged dataset that is denormalized for performance - Spark 
program, hive query passed as parameter, Hive query executed using Spark Sql 
HiveContext

6.  I need to build additional datasets - and plan to use approach detailed in 
#5.

Can you please critique and let me know if there is a better way using Spark, 
of building this data pipeline?
We dont plan to use Hive on MR whatsoever, the tables are purely to support 
leveraging HQL with Spark Sql.

Thanks.
Vajra
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JavaRDD using Reflection

2015-09-14 Thread Ankur Srivastava
Hi Rachana

I didn't get you r question fully but as the error says you can not perform
a rdd transformation or action inside another transformation. In your
example you are performing an action "rdd2.values.count()" in side the "map"
transformation. It is not allowed and in any case this will be very
inefficient too.

you should do something like this:

final long rdd2_count = rdd2.values.count()
rdd1.map(x => rdd2_count * x)

Hope this helps!!

- Ankur

On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Hello all,
>
>
>
> I am working a problem that requires us to create different set of JavaRDD
> based on different input arguments.  We are getting following error when we
> try to use a factory to create JavaRDD.  Error message is clear but I am
> wondering is there any workaround.
>
>
>
> *Question: *
>
> How to create different set of JavaRDD based on different input arguments
> dynamically.  Trying to implement something like factory pattern.
>
>
>
> *Error Message: *
>
> RDD transformations and actions can only be invoked by the driver, not
> inside of other transformations; for example, rdd1.map(x =>
> rdd2.values.count() * x) is invalid because the values transformation and
> count action cannot be performed inside of the rdd1.map transformation. For
> more information, see SPARK-5063.
>
>
>
> Thanks,
>
>
>
> Rachana
>


Where can I learn how to write udf?

2015-09-14 Thread Saif.A.Ellafi
Hi all,

I am failing to find a proper guide or tutorial onto how to write proper udf 
functions in scala.

Appreciate the effort saving,
Saif



Creating fat jar with all resources.(Spark-Java-Maven)

2015-09-14 Thread Vipul Rai
HI All,

I have a spark app written in java,which parses the incoming log using the
headers which are in .xml. (There are many headers and logs are from 15-20
devices in various formats and separators).

I am able to run it in local mode after specifying all the resources and
passing it as parameters.

I tried creating fat jar using maven, it got created successfully but when
I run it on YARN in cluster mode it throws error *resource not found *with
the .xml files.

Can someone please throw some light on this. Any links or tutorial would
also help.

Thanks,
Vipul


A way to timeout and terminate a laggard 'Stage' ?

2015-09-14 Thread Dmitry Goldenberg
Is there a way in Spark to automatically terminate laggard "stage's", ones
that appear to be hanging?   In other words, is there a timeout for
processing of a given RDD?

In the Spark GUI, I see the "kill" function for a given Stage under
'Details for Job <...>".

Is there something in Spark that would identify and kill laggards
proactively?

Thanks.


JavaRDD using Reflection

2015-09-14 Thread Rachana Srivastava
Hello all,

I am working a problem that requires us to create different set of JavaRDD 
based on different input arguments.  We are getting following error when we try 
to use a factory to create JavaRDD.  Error message is clear but I am wondering 
is there any workaround.

Question:
How to create different set of JavaRDD based on different input arguments 
dynamically.  Trying to implement something like factory pattern.

Error Message:
RDD transformations and actions can only be invoked by the driver, not inside 
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, see 
SPARK-5063.

Thanks,

Rachana


Re: Where can I learn how to write udf?

2015-09-14 Thread Silvio Fiorito
Hi Saif,

There are 2 types of UDFs. Those used by SQL and those used by the Scala DSL.

For SQL, you just register a function like so (this example is from the docs):

sqlContext.udf.register(“strLen”, (s: String) => s.length)
sqlContext.sql(“select name, strLen(name) from people”).show


The other method, for Scala DSL, instead:

import org.apache.spark.sql.functions._

def strLen = udf { (s: String) => s.length }

people.select(people(“name”), strLen(people(“name”))).show


Thanks,
Silvio

From: "saif.a.ell...@wellsfargo.com"
Date: Monday, September 14, 2015 at 12:39 PM
To: "user@spark.apache.org"
Subject: Where can I learn how to write udf?

Hi all,

I am failing to find a proper guide or tutorial onto how to write proper udf 
functions in scala.

Appreciate the effort saving,
Saif



Creating fat jar with all resources.(Spark-Java-Maven)

2015-09-14 Thread vipulrai
HI All,

I have a spark app written in java,which parses the incoming log using the
headers which are in .xml. (There are many headers and logs are from 15-20
devices in various formats and separators).

I am able to run it in local mode after specifying all the resources and
passing it as parameters.

I tried creating fat jar using maven, it got created successfully but when I
run it on YARN in cluster mode it throws error resource not found with the
.xml files.

Can someone please throw some light on this. Any links or tutorial would
also help.

Thanks,
Vipul



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-fat-jar-with-all-resources-Spark-Java-Maven-tp24689.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



using existing R packages from SparkR

2015-09-14 Thread bobtreacy
I am trying to use an existing R package in SparkR. I am trying to follow the
example at https://amplab-extras.github.io/SparkR-pkg/ in the section "Using
existing R packages".

Here is the sample in ample extras --
  generateSparse <- function(x) {
# Use sparseMatrix function from the Matrix package
sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
  }
  includePackage(sc, Matrix)
  sparseMat <- lapplyPartition(rdd, generateSparse)

My package (named 'galileo') consists of a number of clustering methods that
operate on input in a dense matrix.

Here is my code prototype, based on the above sample:

t1 <- jsonFile(sqlContext,"/root/test1.txt")
runGalileo <- function(x) {
   galileo(x,model="kmeans",dist="maximum", K=5)
}
SparkR:::includePackage(sc,galileo)
f <- SparkR:::lapplyPartition(t1,runGalileo)

I'm assuming t1 would be a data frame created from data coming from my
existing application as json ( in the prototype from a file, ultimately from
MongoDB).

So my first question is - what should that json look like to represent a
dense matrix (dgeMatrix in R, perhaps) ? 

Question two- I have noted that some of the APIs in the example are no
longer readily available (I had to add "SparkR:::" to use lapplyPartition -
in Spark 1.4). Is there a different way I should be calling existing R
packages?

Where I am coming from is that I was developing a distributed worker
akka/scala framework for scaling out my use of R to run a large numbers of R
methods on behalf of a large number of users through multiple RServe
instances. The call "galileo(x,model="kmeans",dist="maximum", K=5)", where x
is the dense matrix, is typical of the R calls I was sending to RServe. As I
was developing this I kept running into posts to the Spark User group when I
googled troublesome stack traces I was encountering. As I began to become
familiar with Spark and saw that it included SparkR, I came to see this as
an alternative to me developing my own system with all the challenges I was
anticipating.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-existing-R-packages-from-SparkR-tp24693.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Null Value in DecimalType column of DataFrame

2015-09-14 Thread Yin Huai
btw, move it to user list.

On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai  wrote:

> A scale of 10 means that there are 10 digits at the right of the decimal
> point. If you also have precision 10, the range of your data will be [0, 1)
> and casting "10.5" to DecimalType(10, 10) will return null, which is
> expected.
>
> On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi all,
>> I'm moving from spark 1.4 to 1.5, and one of my tests is failing.
>> It seems that there was some changes in org.apache.spark.sql.types.
>> DecimalType
>>
>> This ugly code is a little sample to reproduce the error, don't use it
>> into your project.
>>
>> test("spark test") {
>>   val file = 
>> context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => 
>> Row.fromSeq({
>> val values = f.split(",")
>> 
>> Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head),
>> values.tail.tail.tail.head)}))
>>
>>   val structType = StructType(Seq(StructField("id", IntegerType, false),
>> StructField("int2", IntegerType, false), StructField("double",
>>
>>  DecimalType(10,10), false),
>>
>>
>> StructField("str2", StringType, false)))
>>
>>   val df = context.sqlContext.createDataFrame(file,structType)
>>   df.first
>> }
>>
>> The content of the file is:
>>
>> 1,5,10.5,va
>> 2,1,0.1,vb
>> 3,8,10.0,vc
>>
>> The problem resides in DecimalType, before 1.5 the scala wasn't required.
>> Now when using  DecimalType(12,10) it works fine, but using
>> DecimalType(10,10) the Decimal values
>> 10.5 became null, and the 0.1 works.
>>
>> Is there anybody working with DecimalType for 1.5.1?
>>
>> Regards,
>> Dirceu
>>
>>
>


Re: unoin streams not working for streams > 3

2015-09-14 Thread Gerard Maas
How many cores are you assigning to your spark streaming job?

On Mon, Sep 14, 2015 at 10:33 PM, Василец Дмитрий 
wrote:

> hello
> I have 4 streams from kafka and streaming not working.
> without any errors or logs
> but with 3 streams everything perfect.
> make sense only amount of streams , different triple combinations always
> working.
> any ideas how to debug or fix it ?
>
>
>


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-14 Thread Ricardo Paiva
Thanks Cody.

You confirmed that I'm not doing something wrong. I will keep investigating
and if I find something I let everybody know.

Thanks again.

Regards,

Ricardo

On Mon, Sep 14, 2015 at 6:29 PM, Cody Koeninger  wrote:

> Yeah, looks like you're right about being unable to change those.  Upon
> further reading, even though StreamingContext.getOrCreate makes an entirely
> new spark conf, Checkpoint will only reload certain properties.
>
> I'm not sure if it'd be safe to include memory / cores among those
> properties that get re-loaded, TD would be a better person to ask.
>
> On Mon, Sep 14, 2015 at 2:54 PM, Ricardo Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>> Hi Cody,
>>
>> Thanks for your answer.
>>
>> I had already tried to change the spark submit parameters, but I double
>> checked to reply your answer. Even changing properties file or directly on
>> the spark-submit arguments, none of them work when the application runs
>> from the checkpoint. It seems that everything is cached. I changed driver
>> memory, executor memory, executor cores and number of executors.
>>
>> So, the scenario I have today is: once the Spark Streaming application
>> retrieves the data from the checkpoint, I can't change the submission
>> parameters neither the code parameters without remove the checkpoint
>> folder, loosing all the data used by windowed functions. I was wondering
>> what kind of parameters are you guys loading from the configuration file,
>> when using checkpoints.
>>
>> I really appreciate all the help on this.
>>
>> Many thanks,
>>
>> Ricardo
>>
>>
>>
>>
>>
>>
>>
>>
>> On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger 
>> wrote:
>>
>>> Yeah, it makes sense that parameters that are read only during your
>>> getOrCCreate function wouldn't be re-read, since that function isn't called
>>> if a checkpoint is loaded.
>>>
>>> I would have thought changing number of executors and other things used
>>> by spark-submit would work on checkpoint restart.  Have you tried both
>>> changing them in the properties file provided to spark submit, and the
>>> --arguments that correspond to number of cores / executor memory?
>>>
>>> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>

 Hi guys,

 I tried to use the configuration file, but it didn't work as I
 expected. As part of the Spark Streaming flow, my methods run only when the
 application is started the first time. Once I restart the app, it reads
 from the checkpoint and all the dstream operations come from the cache. No
 parameter is reloaded.

 I would like to know if it's possible to reset the time of windowed
 operations, checkpoint time etc. I also would like to change the submission
 parameters, like number of executors, memory per executor or driver etc. If
 it's not possible, what kind of parameters do you guys usually use in a
 configuration file. I know that the streaming interval it not possible to
 be changed.

 This is my code:

 def main(args: Array[String]): Unit = {
   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
 createSparkContext _)
   ssc.start()
   ssc.awaitTermination()
   ssc.stop()
 }

 def createSparkContext(): StreamingContext = {
   val sparkConf = new SparkConf()
  .setAppName(APP_NAME)
  .set("spark.streaming.unpersist", "true")
   val ssc = new StreamingContext(sparkConf, streamingInterval)
   ssc.checkpoint(CHECKPOINT_FOLDER)
   ssc.sparkContext.addFile(CONFIG_FILENAME)

   val rawStream = createKafkaRDD(ssc)
   processAndSave(rawStream)
   return ssc
 }

 def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {

   val configFile = SparkFiles.get("config.properties")
   val config:Config = ConfigFactory.parseFile(new File(configFile))


 *  slidingInterval =
 Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
 Minutes(config.getInt("streaming.window.interval"))  minPageview =
 config.getInt("streaming.pageview.min")*


   val pageviewStream = rawStream.map{ case (_, raw) =>
 (PageViewParser.parseURL(raw), 1L) }
   val pageviewsHourlyCount = 
 stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
 _,

 PageViewAgregator.pageviewsMinus _,
  *windowLength*,


 *slidingInterval*)

   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
 *minPageview*)
   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
  .repartition(1)
  .saveAsTextFiles(DESTINATION_FILE, "txt")

 }

 I really appreciate any help on this.

 Many thanks,

 Ricardo

 On Thu, Sep 

Re: Is it required to remove checkpoint when submitting a code change?

2015-09-14 Thread Cody Koeninger
Yeah, looks like you're right about being unable to change those.  Upon
further reading, even though StreamingContext.getOrCreate makes an entirely
new spark conf, Checkpoint will only reload certain properties.

I'm not sure if it'd be safe to include memory / cores among those
properties that get re-loaded, TD would be a better person to ask.

On Mon, Sep 14, 2015 at 2:54 PM, Ricardo Paiva  wrote:

> Hi Cody,
>
> Thanks for your answer.
>
> I had already tried to change the spark submit parameters, but I double
> checked to reply your answer. Even changing properties file or directly on
> the spark-submit arguments, none of them work when the application runs
> from the checkpoint. It seems that everything is cached. I changed driver
> memory, executor memory, executor cores and number of executors.
>
> So, the scenario I have today is: once the Spark Streaming application
> retrieves the data from the checkpoint, I can't change the submission
> parameters neither the code parameters without remove the checkpoint
> folder, loosing all the data used by windowed functions. I was wondering
> what kind of parameters are you guys loading from the configuration file,
> when using checkpoints.
>
> I really appreciate all the help on this.
>
> Many thanks,
>
> Ricardo
>
>
>
>
>
>
>
>
> On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger 
> wrote:
>
>> Yeah, it makes sense that parameters that are read only during your
>> getOrCCreate function wouldn't be re-read, since that function isn't called
>> if a checkpoint is loaded.
>>
>> I would have thought changing number of executors and other things used
>> by spark-submit would work on checkpoint restart.  Have you tried both
>> changing them in the properties file provided to spark submit, and the
>> --arguments that correspond to number of cores / executor memory?
>>
>> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>>
>>> Hi guys,
>>>
>>> I tried to use the configuration file, but it didn't work as I expected.
>>> As part of the Spark Streaming flow, my methods run only when the
>>> application is started the first time. Once I restart the app, it reads
>>> from the checkpoint and all the dstream operations come from the cache. No
>>> parameter is reloaded.
>>>
>>> I would like to know if it's possible to reset the time of windowed
>>> operations, checkpoint time etc. I also would like to change the submission
>>> parameters, like number of executors, memory per executor or driver etc. If
>>> it's not possible, what kind of parameters do you guys usually use in a
>>> configuration file. I know that the streaming interval it not possible to
>>> be changed.
>>>
>>> This is my code:
>>>
>>> def main(args: Array[String]): Unit = {
>>>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
>>> createSparkContext _)
>>>   ssc.start()
>>>   ssc.awaitTermination()
>>>   ssc.stop()
>>> }
>>>
>>> def createSparkContext(): StreamingContext = {
>>>   val sparkConf = new SparkConf()
>>>  .setAppName(APP_NAME)
>>>  .set("spark.streaming.unpersist", "true")
>>>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>>>   ssc.checkpoint(CHECKPOINT_FOLDER)
>>>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>>>
>>>   val rawStream = createKafkaRDD(ssc)
>>>   processAndSave(rawStream)
>>>   return ssc
>>> }
>>>
>>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>>>
>>>   val configFile = SparkFiles.get("config.properties")
>>>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>>>
>>>
>>> *  slidingInterval =
>>> Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
>>> Minutes(config.getInt("streaming.window.interval"))  minPageview =
>>> config.getInt("streaming.pageview.min")*
>>>
>>>
>>>   val pageviewStream = rawStream.map{ case (_, raw) =>
>>> (PageViewParser.parseURL(raw), 1L) }
>>>   val pageviewsHourlyCount = 
>>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
>>> _,
>>>
>>> PageViewAgregator.pageviewsMinus _,
>>>  *windowLength*,
>>>
>>>
>>> *slidingInterval*)
>>>
>>>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
>>> *minPageview*)
>>>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>>>  .repartition(1)
>>>  .saveAsTextFiles(DESTINATION_FILE, "txt")
>>>
>>> }
>>>
>>> I really appreciate any help on this.
>>>
>>> Many thanks,
>>>
>>> Ricardo
>>>
>>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
>>> ricardo.pa...@corp.globo.com> wrote:
>>>
 Good tip. I will try that.

 Thank you.

 On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger 
 wrote:

> Yeah, in general if you're changing the jar you can't recover the
> checkpoint.
>
> If you're just changing parameters, why not 

How to convert dataframe to a nested StructType schema

2015-09-14 Thread Hao Wang
Hi, 

I created a dataframe with 4 string columns (city, state, country, zipcode).
I then applied the following nested schema to it by creating a custom
StructType. When I run df.take(5), it gives the exception below as expected.
The question is how I can convert the Rows in the dataframe to conform to
this nested schema? Thanks!

root
 |-- ZipCode: struct (nullable = true)
 ||-- zip: string (nullable = true)
 |-- Address: struct (nullable = true)
 ||-- city: string (nullable = true)
 ||-- state: string (nullable = true)
 ||-- country: string (nullable = true)

[info]   org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
java.lang.String)
[info]  at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
[info]  at
org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
[info]  at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
[info]  at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
[info]  at
org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24694.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



unoin streams not working for streams > 3

2015-09-14 Thread Василец Дмитрий
hello
I have 4 streams from kafka and streaming not working.
without any errors or logs
but with 3 streams everything perfect.
make sense only amount of streams , different triple combinations always
working.
any ideas how to debug or fix it ?


Re: JavaRDD using Reflection

2015-09-14 Thread Ankur Srivastava
It is not reflection that is the issue here but use of an RDD
transformation "featureKeyClassPair.map" inside "lines.mapToPair".

>From the code snippet you have sent it is not very clear if
getFeatureScore(id,data)
invokes executeFeedFeatures, but if that is the case it is not very obvious
that “data” is a supposed to be huge and thus need to be  PairRDD and if it
is not you do not need to use the JavaPairRDD, instead use
a Map and return a List.

If it data is huge and has to be PairRDD pull out the logic to build the
data PairRDD and then invoke map function on that RDD.

- Ankur

On Mon, Sep 14, 2015 at 12:43 PM, 
wrote:

> Thanks so much Ajay and Ankur for your input.
>
>
>
> What we are trying to do is following:  I am trying to invoke a class
> using Java reflection to get the result
>
>
>
> *THIS WORKS FINE*
>
> public static void main(String[] args) throws Exception {
>
> final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
>
> ...
>
> METHOD THAT I AM TRYING TO INVOKE USING REFLECTION
>
> JavaPairDStream urlFeatureScore = lines.mapToPair( new
> PairFunction() {
>
> public Tuple2 call(final String urlString) throws
> Exception {
>
> String  featureScore = getFeatureScore(id,data);
>
>   return new Tuple2(urlString,  featureScore);
>
> }
>
>   });
>
> ...
>
> *REPLACED WITH METHOD INVOKED USING REFLECTION DOES NOT WORK ERROR MESSAGE
> BELOW.*
>
> >  executeFeedFactories2(featureClassName, featureParam, featureData)
>
> jssc.start();
>
> jssc.awaitTermination();
>
>   }
>
>
>
> *Splitting the same work to Class  using Reflection does not work:*
>
>
>
> private static  JavaRDD  executeFeedFactories2(String
> featureClassName, Map featureParam,JavaPairRDD String> featureData) throws Exception {
>
> Class featureClass = Class.forName(MyClass);
>
>Method m = featureClass.getMethod("executeFeedFeatures",
> Map.class, JavaPairRDD.class);
>
>JavaRDD  score = ( JavaRDD )
> m.invoke(featureClass.newInstance(), featureParam,featureData);
>
> return score;
>
> }
>
>
>
> public class MyClass{
>
> public static JavaRDD executeFeedFeatures(*Map* 
> featureParamMap,JavaPairRDD String> featureKeyClassPair ){
>
> featureScoreRDD = featureKeyClassPair.map(new Function String>, Double>() {
>
> public Double call(Tuple2 keyValue) {
>
> …
>
> }
>
> });
>
> return featureScoreRDD;
>
> }
>
>
>
> }
>
>
>
> Thanks again for all your help and advice.
>
>
>
> Regards,
>
>
>
> Rachana
>
>
>
> *From:* Ajay Singal [mailto:asinga...@gmail.com]
> *Sent:* Monday, September 14, 2015 12:20 PM
> *To:* Rachana Srivastava; Ankur Srivastava
> *Cc:* user@spark.apache.org; d...@spark.apache.org; Ajay Singal
> *Subject:* Re: JavaRDD using Reflection
>
>
>
> Hello Rachana,
>
>
>
> The easiest way would be to start with creating a 'parent' JavaRDD and run
> different filters (based on different input arguments) to create respective
> 'child' JavaRDDs dynamically.
>
>
>
> Notice that the creation of these children RDDs is handled by the
> application driver.
>
>
>
> Hope this helps!
>
> Ajay
>
>
>
> On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
> Hi Rachana
>
>
>
> I didn't get you r question fully but as the error says you can not
> perform a rdd transformation or action inside another transformation. In
> your example you are performing an action "rdd2.values.count()" in side
> the "map" transformation. It is not allowed and in any case this will be
> very inefficient too.
>
>
>
> you should do something like this:
>
>
>
> final long rdd2_count = rdd2.values.count()
>
> rdd1.map(x => rdd2_count * x)
>
>
>
> Hope this helps!!
>
>
>
> - Ankur
>
>
>
> On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
> Hello all,
>
>
>
> I am working a problem that requires us to create different set of JavaRDD
> based on different input arguments.  We are getting following error when we
> try to use a factory to create JavaRDD.  Error message is clear but I am
> wondering is there any workaround.
>
>
>
> *Question: *
>
> How to create different set of JavaRDD based on different input arguments
> dynamically.  Trying to implement something like factory pattern.
>
>
>
> *Error Message: *
>
> RDD transformations and actions can only be invoked by the driver, not
> inside of other transformations; for example, rdd1.map(x =>
> rdd2.values.count() * x) is invalid because the values transformation and
> count action cannot be performed inside of the rdd1.map transformation. For
> more information, see SPARK-5063.
>
>
>

Re: connecting to remote spark and reading files on HDFS or s3 in sparkR

2015-09-14 Thread roni
Thanks Akhil. Very good article.

On Mon, Sep 14, 2015 at 4:15 AM, Akhil Das 
wrote:

> You can look into this doc
>  regarding the
> connection (its for gce though but it should be similar).
>
> Thanks
> Best Regards
>
> On Thu, Sep 10, 2015 at 11:20 PM, roni  wrote:
>
>> I have spark installed on a EC2 cluster. Can I connect to that from my
>> local sparkR in RStudio? if yes , how ?
>>
>> Can I read files  which I have saved as parquet files on hdfs  or s3 in
>> sparkR ? If yes , How?
>>
>> Thanks
>> -Roni
>>
>>
>


add external jar file to Spark shell vs. Scala Shell

2015-09-14 Thread Lan Jiang

Hi, there

I ran into a problem when I try to pass external jar file to spark-shell. 

I have a uber jar file that contains all the java codes I created for protobuf 
and all its dependency. 

If I simply execute my code using Scala Shell, it works fine without error. I 
use -cp to pass the external uber jar file here

./scala -cp 
~/workspace/protobuf/my-app/target/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar
scala> import com.test.proto.Tr.MyProto
import com.test.proto.Tr.MyProto

scala> import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Paths}

scala> val byteArray = 
Files.readAllBytes(Paths.get("/Users/ljiang/workspace/protobuf/my-app/myproto.pro"))
byteArray: Array[Byte] = Array(10, -62, -91, 2, -86, 6, -108, 65, 8, 10, 18, 
-113, 65, -54, 12, -85, 46, -22, 18, -30, 10, 10, 2, 73, 78, 18, -37, 10, -118, 
25, -52, 10, -22, 43, 15, 10, 1, 49, 18, ...
scala> val myProto = MyProto.parseFrom(byteArray)


Now the weird thing is that if I launched the spark-shell instead and execute 
the same code (Please note that I do not even using any SparkContext, RDD), it 
does not work. I use --jars option to pass the external jar file to spark-shell

spark-shell --jars 
~/workspace/protobuf/my-app/target/my-app-1.0-SNAPSHOT-jar-with-dependencies.jar

scala> import com.test.proto.Tr.MyProto
import com.test.proto.Tr.MyProto

scala> import java.nio.file.{Files, Paths}
import java.nio.file.{Files, Paths}

scala> val byteArray = 
Files.readAllBytes(Paths.get("/Users/ljiang/workspace/protobuf/my-app/myproto.pro"))
byteArray: Array[Byte] = Array(10, -62, -91, 2, -86, 6, -108, 65, 8, 10, 18, 
-113, 65, -54, 12, -85, 46, -22, 18, -30, 10, 10, 2, 73, 78, 18, -37, 10, -118, 
25, -52, 10, -22, 43, 15, 10, 1, 49, 18, ...
scala> val myProto = MyProto.parseFrom(byteArray)
java.lang.NoSuchFieldError: unknownFields
at com.test.proto.Tr$MyProto.(Tr.java)
at com.test.proto.Tr$MyProto.(Tr.java)
at com.test.proto.Tr$MyProto$1.parsePartialFrom(Tr.java)
at com.test.proto.Tr$MyProto$1.parsePartialFrom(Tr.java)
at 
com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at com.test.proto.Tr$MyProto.parseFrom(Tr.java)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:23)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC$$iwC.(:34)
at $iwC$$iwC$$iwC.(:36)
at $iwC$$iwC.(:38)
at $iwC.(:40)
at (:42)
at .(:46)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at 
org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at 

Caching intermediate results in Spark ML pipeline?

2015-09-14 Thread Jingchu Liu
Hi all,

I have a question regarding the ability of ML pipeline to cache
intermediate results. I've posted this question on stackoverflow

but got no answer, hope someone here can help me out.

===
Lately I'm planning to migrate my standalone python ML code to spark. The
ML pipeline in spark.ml turns out quite handy, with streamlined API for
chaining up algorithm stages and hyper-parameter grid search.

Still, I found its support for one important feature obscure in existing
documents: caching of intermediate results. The importance of this feature
arise when the pipeline involves computation intensive stages.

For example, in my case I use a huge sparse matrix to perform multiple
moving averages on time series data in order to form input features. The
structure of the matrix is determined by some hyper-parameter. This step
turns out to be a bottleneck for the entire pipeline because I have to
construct the matrix in runtime.

During parameter search, I usually have other parameters to examine in
addition to this "structure parameter". So if I can reuse the huge matrix
when the "structure parameter" is unchanged, I can save tons of time. For
this reason, I intentionally formed my code to cache and reuse these
intermediate results.

So my question is: can Spark's ML pipeline handle intermediate caching
automatically? Or do I have to manually form code to do so? If so, is there
any best practice to learn from?

P.S. I have looked into the official document and some other material, but
none of them seems to discuss this topic.



Best,
Lewis


Re: Spark Streaming Suggestion

2015-09-14 Thread Jörn Franke
Why did you not stay with the batch approach? For me the architecture looks
very complex for a simple thing you want to achieve. Why don't you process
the data already in storm ?

Le mar. 15 sept. 2015 à 6:20, srungarapu vamsi  a
écrit :

> I am pretty new to spark. Please suggest a better model for the following
> use case.
>
> I have few (about 1500) devices in field which keep emitting about 100KB
> of data every minute. The nature of data sent by the devices is just a list
> of numbers.
> As of now, we have Storm is in the architecture which receives this data,
> sanitizes it and writes to cassandra.
> Now, i have a requirement to process this data. The processing includes
> finding unique numbers emitted by one or more devices for every minute,
> every hour, every day, every month.
> I had implemented this processing part as a batch job execution and now i
> am interested in making it a streaming application. i.e calculating the
> processed data as and when devices emit the data.
>
> I have the following two approaches:
> 1. Storm writes the actual data to cassandra and writes a message on Kafka
> bus that data corresponding to device D and minute M has been written to
> cassandra
>
> Then Spark streaming reads this message from kafka , then reads the data
> of Device D at minute M from cassandra and starts processing the data.
>
> 2. Storm writes the data to both cassandra and  kafka, spark reads the
> actual data from kafka , processes the data and writes to cassandra.
> The second approach avoids additional hit of reading from cassandra every
> minute , a device has written data to cassandra at the cost of putting the
> actual heavy messages instead of light events on  kafka.
>
> I am a bit confused among the two approaches. Please suggest which one is
> better and if both are bad, how can i handle this use case?
>
>
> --
> /Vamsi
>


Re: Spark aggregateByKey Issues

2015-09-14 Thread Alexis Gillain
I'm not sure about what you want to do.

You should try to sort the RDD by (yourKey, date), it ensures that all the
keys are in the same partition.

You problem after that is you want to aggregate only on yourKey and if you
change the Key of the sorted RDD you loose partitionning.

Depending of the size of the result you can use an aggregate bulding a map
of results by youKey or use MapPartition to output a rdd (in this case set
the number of partition high enough to allow the partition to fit in
memory).

see
http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html

2015-09-15 11:25 GMT+08:00 毕岩 :

> Hi:
>
>
> There is such one case about using reduce operation like that:
>
>
> I Need to reduce large data made up of billions of records with a
> Key-Value pair.
>
> For the following:
>
> *First,group by Key, and the records with the same Key need to be in
> order of one field called “date” in Value*
>
> *Second, in records with the same Key, every operation to one recored
> need to depend on the result of dealing with the last one, and the first
> one is original state..*
>
>
> Some attempts:
>
> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so
> executors heartbeat time out and throw errors “Lost executor”, or the
> application is hung…
>
>
> 2. AggregateByKey:
>
> * def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,*
>
> *combOp: (U, U) => U): RDD[(K, U)]*
>
> About aggregateByKey, is all the records with the same Key In the same
> partition ? Is the zeroValue applied to the first one in all records with
> the same Key, or in each partition ? If it is the former, comOp Function
> do nothing!
>
> I tried to take the second “numPartitions” parameter, pass the number of
> key to it. But, the number of key is so large to all the tasks be killed.
>
>
> What should I do with this case ?
>
> I'm asking for advises online...
>
> Thank you.
>



-- 
Alexis GILLAIN


Change protobuf version or any other third party library version in Spark application

2015-09-14 Thread Lan Jiang
Hi, there,

I am using Spark 1.4.1. The protobuf 2.5 is included by Spark 1.4.1 by
default. However, I would like to use Protobuf 3 in my spark application so
that I can use some new features such as Map support.  Is there anyway to
do that?

Right now if I build a uber.jar with dependencies including protobuf 3
classes and pass to spark-shell through --jars option, during the
execution, I got the error *java.lang.NoSuchFieldError: unknownFields. *

Is there anyway to use a different version of Protobuf other than the
default one included in the Spark distribution? I guess I can generalize
and extend the question to any third party libraries. How to deal with
version conflict for any third party libraries included in the Spark
distribution?

Thanks!

Lan


Spark aggregateByKey Issues

2015-09-14 Thread 毕岩
Hi:


There is such one case about using reduce operation like that:


I Need to reduce large data made up of billions of records with a Key-Value
pair.

For the following:

*First,group by Key, and the records with the same Key need to be in
order of one field called “date” in Value*

*Second, in records with the same Key, every operation to one recored
need to depend on the result of dealing with the last one, and the first
one is original state..*


Some attempts:

1. groupByKey + map :  because of the bad performance, CPU is to 100%, so
executors heartbeat time out and throw errors “Lost executor”, or the
application is hung…


2. AggregateByKey:

* def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,*

*combOp: (U, U) => U): RDD[(K, U)]*

About aggregateByKey, is all the records with the same Key In the same
partition ? Is the zeroValue applied to the first one in all records with
the same Key, or in each partition ? If it is the former, comOp Function do
nothing!

I tried to take the second “numPartitions” parameter, pass the number of
key to it. But, the number of key is so large to all the tasks be killed.


What should I do with this case ?

I'm asking for advises online...

Thank you.


Re: unoin streams not working for streams > 3

2015-09-14 Thread Василец Дмитрий
I use local[*]. And i have 4 cores on laptop.
On 14 Sep 2015 23:19, "Gerard Maas"  wrote:

> How many cores are you assigning to your spark streaming job?
>
> On Mon, Sep 14, 2015 at 10:33 PM, Василец Дмитрий <
> pronix.serv...@gmail.com> wrote:
>
>> hello
>> I have 4 streams from kafka and streaming not working.
>> without any errors or logs
>> but with 3 streams everything perfect.
>> make sense only amount of streams , different triple combinations always
>> working.
>> any ideas how to debug or fix it ?
>>
>>
>>
>


Re: Caching intermediate results in Spark ML pipeline?

2015-09-14 Thread Feynman Liang
Lewis,

Many pipeline stages implement save/load methods, which can be used if you
instantiate and call the underlying pipeline stages `transform` methods
individually (instead of using the Pipeline.setStages API). See associated
JIRAs .

Pipeline persistence is on the 1.6 roadmap, JIRA here
.

Feynman

On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu  wrote:

> Hi all,
>
> I have a question regarding the ability of ML pipeline to cache
> intermediate results. I've posted this question on stackoverflow
> 
> but got no answer, hope someone here can help me out.
>
> ===
> Lately I'm planning to migrate my standalone python ML code to spark. The
> ML pipeline in spark.ml turns out quite handy, with streamlined API for
> chaining up algorithm stages and hyper-parameter grid search.
>
> Still, I found its support for one important feature obscure in existing
> documents: caching of intermediate results. The importance of this feature
> arise when the pipeline involves computation intensive stages.
>
> For example, in my case I use a huge sparse matrix to perform multiple
> moving averages on time series data in order to form input features. The
> structure of the matrix is determined by some hyper-parameter. This step
> turns out to be a bottleneck for the entire pipeline because I have to
> construct the matrix in runtime.
>
> During parameter search, I usually have other parameters to examine in
> addition to this "structure parameter". So if I can reuse the huge matrix
> when the "structure parameter" is unchanged, I can save tons of time. For
> this reason, I intentionally formed my code to cache and reuse these
> intermediate results.
>
> So my question is: can Spark's ML pipeline handle intermediate caching
> automatically? Or do I have to manually form code to do so? If so, is there
> any best practice to learn from?
>
> P.S. I have looked into the official document and some other material, but
> none of them seems to discuss this topic.
>
>
>
> Best,
> Lewis
>


RE: Best way to merge final output part files created by Spark job

2015-09-14 Thread java8964
For text file, this merge works fine, but for binary format like "ORC", 
"Parquet" or "AVOR", not sure this will work.
These kind of formats in fact are not append-able, as they write the detail 
data information either in the head or at tail part of the file.
You have to use the format specified API to merge the data.
Yong

Date: Mon, 14 Sep 2015 09:10:33 +0200
Subject: Re: Best way to merge final output part files created by Spark job
From: gmu...@stratio.com
To: umesh.ka...@gmail.com
CC: user@spark.apache.org

Hi, check out  FileUtil.copyMerge function in the Hadoop API.  
It's simple,  
Get the hadoop configuration from Spark Context  FileSystem fs = 
FileSystem.get(sparkContext.hadoopConfiguration());
Create new Path with destination and source directory.Call copyMerge   
FileUtil.copyMerge(fs, inputPath, fs, destPath, true, 
sparkContext.hadoopConfiguration(), null);
2015-09-13 23:25 GMT+02:00 unk1102 :
Hi I have a spark job which creates around 500 part files inside each

directory I process. So I have thousands of such directories. So I need to

merge these small small 500 part files. I am using

spark.sql.shuffle.partition as 500 and my final small files are ORC files.

Is there a way to merge orc files in Spark if not please suggest the best

way to merge files created by Spark job in hdfs please guide. Thanks much.







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-final-output-part-files-created-by-Spark-job-tp24681.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org





-- 
Gaspar Muñoz 
@gmunozsoria
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, MadridTel: +34 91 352 59 42 // @stratiobd
  

Spark Streaming Suggestion

2015-09-14 Thread srungarapu vamsi
I am pretty new to spark. Please suggest a better model for the following
use case.

I have few (about 1500) devices in field which keep emitting about 100KB of
data every minute. The nature of data sent by the devices is just a list of
numbers.
As of now, we have Storm is in the architecture which receives this data,
sanitizes it and writes to cassandra.
Now, i have a requirement to process this data. The processing includes
finding unique numbers emitted by one or more devices for every minute,
every hour, every day, every month.
I had implemented this processing part as a batch job execution and now i
am interested in making it a streaming application. i.e calculating the
processed data as and when devices emit the data.

I have the following two approaches:
1. Storm writes the actual data to cassandra and writes a message on Kafka
bus that data corresponding to device D and minute M has been written to
cassandra

Then Spark streaming reads this message from kafka , then reads the data of
Device D at minute M from cassandra and starts processing the data.

2. Storm writes the data to both cassandra and  kafka, spark reads the
actual data from kafka , processes the data and writes to cassandra.
The second approach avoids additional hit of reading from cassandra every
minute , a device has written data to cassandra at the cost of putting the
actual heavy messages instead of light events on  kafka.

I am a bit confused among the two approaches. Please suggest which one is
better and if both are bad, how can i handle this use case?


-- 
/Vamsi


Setting Executor memory

2015-09-14 Thread Thomas Gerber
Hello,

I was looking for guidelines on what value to set executor memory to
(via spark.executor.memory for example).

This seems to be important to avoid OOM during tasks, especially in no swap
environments (like AWS EMR clusters).

This setting is really about the executor JVM heap. Hence, in order to come
up with the maximal amount of heap memory for the executor, we need to list:
1. the memory taken by other processes (Worker in standalone mode, ...)
2. all off-heap allocations in the executor

Fortunately, for #1, we can just look at memory consumption without any
application running.

For #2, it is trickier. What I suspect we should account for:
a. thread stack size
b. akka buffers (via akka framesize & number of akka threads)
c. kryo buffers
d. shuffle buffers
(e. tachyon)

Could anyone shed some light on this? Maybe a formula? Or maybe swap should
actually be turned on, as a safeguard against OOMs?

Thanks


Re: Spark aggregateByKey Issues

2015-09-14 Thread biyan900116
Hi Alexis:

Thank you for your replying.

My case is that each operation to one record need to depend on one value that 
will be set by the operating to the last record. 

So your advise is that i can use “sortByKey”. “sortByKey” will put all records 
with the same Key in one partition. Need I take the “numPartitions” parameter ? 
Or even if i don’t , it still do that .

If it works, add “aggregate” to deal with my case, i think the comOp function 
in parameter list of aggregate API won’t be executed.. Is my understanding 
wrong ? 
  

> 在 2015年9月15日,下午12:47,Alexis Gillain  写道:
> 
> I'm not sure about what you want to do.
> 
> You should try to sort the RDD by (yourKey, date), it ensures that all the 
> keys are in the same partition.
> 
> You problem after that is you want to aggregate only on yourKey and if you 
> change the Key of the sorted RDD you loose partitionning.
> 
> Depending of the size of the result you can use an aggregate bulding a map of 
> results by youKey or use MapPartition to output a rdd (in this case set the 
> number of partition high enough to allow the partition to fit in memory).
> 
> see 
> http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
>  
> 
> 
> 2015-09-15 11:25 GMT+08:00 毕岩  >:
> Hi:
> 
> 
> 
> There is such one case about using reduce operation like that:
> 
> 
> 
> I Need to reduce large data made up of billions of records with a Key-Value 
> pair.
> 
> For the following:
> 
> First,group by Key, and the records with the same Key need to be in order 
> of one field called “date” in Value
> 
> Second, in records with the same Key, every operation to one recored need 
> to depend on the result of dealing with the last one, and the first one is 
> original state..
> 
> 
> 
> Some attempts:
> 
> 1. groupByKey + map :  because of the bad performance, CPU is to 100%, so 
> executors heartbeat time out and throw errors “Lost executor”, or the 
> application is hung…
> 
> 
> 
> 2. AggregateByKey:
> 
> def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
> 
> combOp: (U, U) => U): RDD[(K, U)]
> 
> About aggregateByKey, is all the records with the same Key In the same 
> partition ? Is the zeroValue applied to the first one in all records with the 
> same Key, or in each partition ? If it is the former, comOp Function do 
> nothing! 
> 
> I tried to take the second “numPartitions” parameter, pass the number of key 
> to it. But, the number of key is so large to all the tasks be killed.
> 
> 
> 
> What should I do with this case ? 
> 
> I'm asking for advises online...
> 
> Thank you.
> 
> 
> 
> 
> -- 
> Alexis GILLAIN



why spark and kafka always crash

2015-09-14 Thread Joanne Contact
How to prevent it?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Caching intermediate results in Spark ML pipeline?

2015-09-14 Thread Feynman Liang
You can persist the transformed Dataframes, for example

val data : DF = ...
val hashedData = hashingTF.transform(data)
hashedData.cache() // to cache DataFrame in memory

Future usage of hashedData read from an in-memory cache now.

You can also persist to disk, eg:

hashedData.write.parquet(FilePath) // to write DataFrame in Parquet format
to disk
...
val savedHashedData = sqlContext.read.parquet(FilePath)

Future uses of hash

Like my earlier response, this will still require you call each
PipelineStage's `transform` method (i.e. to NOT use the overall
Pipeline.setStages API)

On Mon, Sep 14, 2015 at 10:45 PM, Jingchu Liu  wrote:

> Hey Feynman,
>
> Thanks for your response, but I'm afraid "model save/load" is not exactly
> the feature I'm looking for.
>
> What I need to cache and reuse are the intermediate outputs of
> transformations, not transformer themselves. Do you know any related dev.
> activities or plans?
>
> Best,
> Lewis
>
> 2015-09-15 13:03 GMT+08:00 Feynman Liang :
>
>> Lewis,
>>
>> Many pipeline stages implement save/load methods, which can be used if
>> you instantiate and call the underlying pipeline stages `transform` methods
>> individually (instead of using the Pipeline.setStages API). See associated
>> JIRAs .
>>
>> Pipeline persistence is on the 1.6 roadmap, JIRA here
>> .
>>
>> Feynman
>>
>> On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu 
>> wrote:
>>
>>> Hi all,
>>>
>>> I have a question regarding the ability of ML pipeline to cache
>>> intermediate results. I've posted this question on stackoverflow
>>> 
>>> but got no answer, hope someone here can help me out.
>>>
>>> ===
>>> Lately I'm planning to migrate my standalone python ML code to spark.
>>> The ML pipeline in spark.ml turns out quite handy, with streamlined API
>>> for chaining up algorithm stages and hyper-parameter grid search.
>>>
>>> Still, I found its support for one important feature obscure in existing
>>> documents: caching of intermediate results. The importance of this feature
>>> arise when the pipeline involves computation intensive stages.
>>>
>>> For example, in my case I use a huge sparse matrix to perform multiple
>>> moving averages on time series data in order to form input features. The
>>> structure of the matrix is determined by some hyper-parameter. This step
>>> turns out to be a bottleneck for the entire pipeline because I have to
>>> construct the matrix in runtime.
>>>
>>> During parameter search, I usually have other parameters to examine in
>>> addition to this "structure parameter". So if I can reuse the huge matrix
>>> when the "structure parameter" is unchanged, I can save tons of time. For
>>> this reason, I intentionally formed my code to cache and reuse these
>>> intermediate results.
>>>
>>> So my question is: can Spark's ML pipeline handle intermediate caching
>>> automatically? Or do I have to manually form code to do so? If so, is there
>>> any best practice to learn from?
>>>
>>> P.S. I have looked into the official document and some other material,
>>> but none of them seems to discuss this topic.
>>>
>>>
>>>
>>> Best,
>>> Lewis
>>>
>>
>>
>


Re: Caching intermediate results in Spark ML pipeline?

2015-09-14 Thread Jingchu Liu
Hey Feynman,

Thanks for your response, but I'm afraid "model save/load" is not exactly
the feature I'm looking for.

What I need to cache and reuse are the intermediate outputs of
transformations, not transformer themselves. Do you know any related dev.
activities or plans?

Best,
Lewis

2015-09-15 13:03 GMT+08:00 Feynman Liang :

> Lewis,
>
> Many pipeline stages implement save/load methods, which can be used if you
> instantiate and call the underlying pipeline stages `transform` methods
> individually (instead of using the Pipeline.setStages API). See associated
> JIRAs .
>
> Pipeline persistence is on the 1.6 roadmap, JIRA here
> .
>
> Feynman
>
> On Mon, Sep 14, 2015 at 9:20 PM, Jingchu Liu  wrote:
>
>> Hi all,
>>
>> I have a question regarding the ability of ML pipeline to cache
>> intermediate results. I've posted this question on stackoverflow
>> 
>> but got no answer, hope someone here can help me out.
>>
>> ===
>> Lately I'm planning to migrate my standalone python ML code to spark. The
>> ML pipeline in spark.ml turns out quite handy, with streamlined API for
>> chaining up algorithm stages and hyper-parameter grid search.
>>
>> Still, I found its support for one important feature obscure in existing
>> documents: caching of intermediate results. The importance of this feature
>> arise when the pipeline involves computation intensive stages.
>>
>> For example, in my case I use a huge sparse matrix to perform multiple
>> moving averages on time series data in order to form input features. The
>> structure of the matrix is determined by some hyper-parameter. This step
>> turns out to be a bottleneck for the entire pipeline because I have to
>> construct the matrix in runtime.
>>
>> During parameter search, I usually have other parameters to examine in
>> addition to this "structure parameter". So if I can reuse the huge matrix
>> when the "structure parameter" is unchanged, I can save tons of time. For
>> this reason, I intentionally formed my code to cache and reuse these
>> intermediate results.
>>
>> So my question is: can Spark's ML pipeline handle intermediate caching
>> automatically? Or do I have to manually form code to do so? If so, is there
>> any best practice to learn from?
>>
>> P.S. I have looked into the official document and some other material,
>> but none of them seems to discuss this topic.
>>
>>
>>
>> Best,
>> Lewis
>>
>
>


Re: hdfs-ha on mesos - odd bug

2015-09-14 Thread Sam Bessalah
I don't know about the broken url. But are you running HDFS as a mesos
framework? If so is it using mesos-dns?
Then you should resolve the namenode via hdfs:/// 

On Mon, Sep 14, 2015 at 3:55 PM, Adrian Bridgett 
wrote:

> I'm hitting an odd issue with running spark on mesos together with
> HA-HDFS, with an even odder workaround.
>
> In particular I get an error that it can't find the HDFS nameservice
> unless I put in a _broken_ url (discovered that workaround by mistake!).
> core-site.xml, hdfs-site.xml is distributed to the slave node - and that
> file is read since I deliberately break the file then I get an error as
> you'd expect.
>
> NB: This is a bit different to
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201402.mbox/%3c1392442185079-1549.p...@n3.nabble.com%3E
>
>
> Spark 1.5.0:
>
> t=sc.textFile("hdfs://nameservice1/tmp/issue")
> t.count()
> (fails)
>
> t=sc.textFile("file://etc/passwd")
> t.count()
> (errors about bad url - should have an extra / of course)
> t=sc.textFile("hdfs://nameservice1/tmp/issue")
> t.count()
> then it works!!!
>
> I should say that using file:///etc/passwd or hdfs:///tmp/issue both fail
> as well.  Unless preceded by a broken url.I've tried setting
> spark.hadoop.cloneConf to true, no change.
>
> Sample (broken) run:
> 15/09/14 13:00:14 DEBUG HadoopRDD: Creating new JobConf and caching it for
> later re-use
> 15/09/14 13:00:14 DEBUG : address: ip-10-1-200-165/10.1.200.165
> isLoopbackAddress: false, with host 10.1.200.165 ip-10-1-200-165
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.use.legacy.blockreader.local = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit =
> false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.domain.socket.data.traffic = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
> /var/run/hdfs-sockets/dn
> 15/09/14 13:00:14 DEBUG HAUtil: No HA service delegation token found for
> logical URI hdfs://nameservice1
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.use.legacy.blockreader.local = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.client.read.shortcircuit =
> false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal:
> dfs.client.domain.socket.data.traffic = false
> 15/09/14 13:00:14 DEBUG BlockReaderLocal: dfs.domain.socket.path =
> /var/run/hdfs-sockets/dn
> 15/09/14 13:00:14 DEBUG RetryUtils: multipleLinearRandomRetry = null
> 15/09/14 13:00:14 DEBUG Server: rpcKind=RPC_PROTOCOL_BUFFER,
> rpcRequestWrapperClass=class
> org.apache.hadoop.ipc.ProtobufRpcEngine$RpcRequestWrapper,
> rpcInvoker=org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker@6245f50b
> 15/09/14 13:00:14 DEBUG Client: getting client out of cache:
> org.apache.hadoop.ipc.Client@267f0fd3
> 15/09/14 13:00:14 DEBUG NativeCodeLoader: Trying to load the custom-built
> native-hadoop library...
> 15/09/14 13:00:14 DEBUG NativeCodeLoader: Loaded the native-hadoop library
> ...
> 15/09/14 13:00:14 DEBUG Client: Connecting to
> mesos-1.example.com/10.1.200.165:8020
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: starting, having
> connections 1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #0
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #0
> 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getFileInfo took 36ms
> 15/09/14 13:00:14 DEBUG FileInputFormat: Time taken to get FileStatuses: 69
> 15/09/14 13:00:14 INFO FileInputFormat: Total input paths to process : 1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu sending #1
> 15/09/14 13:00:14 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu got value #1
> 15/09/14 13:00:14 DEBUG ProtobufRpcEngine: Call: getBlockLocations took 1ms
> 15/09/14 13:00:14 DEBUG FileInputFormat: Total # of splits generated by
> getSplits: 2, TimeTaken: 104
> ...
> 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: closed
> 15/09/14 13:00:24 DEBUG Client: IPC Client (1739425103) connection to
> mesos-1.example.com/10.1.200.165:8020 from ubuntu: stopped, remaining
> connections 0
> 15/09/14 13:00:24 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
> message
> AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true)
> from Actor[akka://sparkDriver/temp/$g]
> 15/09/14 13:00:24 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message:
> AkkaMessage(ExecutorRemoved(20150826-133446-3217621258-5050-4064-S1),true)
> 15/09/14 13:00:24 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
> message 

How to clear Kafka offset in Spark streaming?

2015-09-14 Thread Bin Wang
Hi,

I'm using spark streaming with kafka and I need to clear the offset and
re-compute all things. I deleted checkpoint directory in HDFS and reset
kafka offset with "kafka-run-class kafka.tools.ImportZkOffsets". I can
confirm the offset is set to 0 in kafka:

~ > kafka-run-class kafka.tools.ConsumerOffsetChecker --group
adhoc_data_spark --topic adhoc_data --zookeeper szq1.appadhoc.com:2181
Group   Topic  Pid Offset  logSize
Lag Owner
adhoc_data_spark adhoc_data 0   0   5280743
5280743 none

But when I restart spark streaming, the offset is reset to logSize, I
cannot figure out why is that, can anybody help? Thanks.


Interacting with Different Versions of Hive Metastore, how to config?

2015-09-14 Thread bg_spark
spark.sql.hive.metastore.version0.13.1  Version of the Hive metastore.
Available options are 0.12.0 through 1.2.1.
spark.sql.hive.metastore.jars   builtin Location of the jars that 
should be
used to instantiate the HiveMetastoreClient. This property can be one of
three options:

builtin
Use Hive 1.2.1, which is bundled with the Spark assembly jar when -Phive
is enabled. When this option is chosen, spark.sql.hive.metastore.version
must be either 1.2.1 or not defined.
maven
Use Hive jars of specified version downloaded from Maven repositories.
This configuration is not generally recommended for production deployments.
A classpath in the standard format for the JVM. This classpath must
include all of Hive and its dependencies, including the correct version of
Hadoop. These jars only need to be present on the driver, but if you are
running in yarn cluster mode then you must ensure they are packaged with you
application.

I config the parms like this: 
spark.sql.hive.metastore.version=0.14.0
spark.sql.hive.metastore.jars=$HIVE_HOME/lib/*:hadoop classpath
I have put all the jars in the classpath,but don't work





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Interacting-with-Different-Versions-of-Hive-Metastore-how-to-config-tp24683.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best way to merge final output part files created by Spark job

2015-09-14 Thread Gaspar Muñoz
Hi, check out  FileUtil.copyMerge function in the Hadoop API
.

It's simple,


   1. Get the hadoop configuration from Spark Context  FileSystem fs =
   FileSystem.get(sparkContext.hadoopConfiguration());
   2. Create new Path
   
with
   destination and source directory.
   3. Call copyMerge   FileUtil.copyMerge(fs, inputPath, fs, destPath,
   true, sparkContext.hadoopConfiguration(), null);


2015-09-13 23:25 GMT+02:00 unk1102 :

> Hi I have a spark job which creates around 500 part files inside each
> directory I process. So I have thousands of such directories. So I need to
> merge these small small 500 part files. I am using
> spark.sql.shuffle.partition as 500 and my final small files are ORC files.
> Is there a way to merge orc files in Spark if not please suggest the best
> way to merge files created by Spark job in hdfs please guide. Thanks much.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-final-output-part-files-created-by-Spark-job-tp24681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Gaspar Muñoz
@gmunozsoria



Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd *


Parse tab seperated file inc json efficent

2015-09-14 Thread matthes
I try to parse a tab seperated file in Spark 1.5 with a json section as
efficent as possible.
The file looks like follows: 

value1value2{json}

How can I parse all fields inc the json fields into a RDD directly?

If I use this peace of code:

val jsonCol = sc.textFile("/data/input").map(l => l.split("\t",3)).map(x =>
x(2).trim()).cache()
val json = sqlContext.read.json(jsonCol).rdd

I will loose value1 and value2!!!
I'm open for any idea!



-
I'm using Spark 1.5
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parse-tab-seperated-file-inc-json-efficent-tp24691.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
Hello, I am trying to use dynamic allocation which requires the shuffle
service. I am running Spark on mesos.

Whenever I set spark.shuffle.service.enabled=true, my Spark driver fails
with an error like this:

Caused by: java.net.ConnectException: Connection refused: devspark1/
172.26.21.70:7337

It's not clear from the documentation if the shuffle service starts
automatically just by having it enabled, or if I need to do something else.
There are instructions for running the shuffle service in YARN, but not
mesos.

- Philip


Re: Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Tim Chen
Hi Philip,

I've included documentation in the Spark/Mesos doc (
http://spark.apache.org/docs/latest/running-on-mesos.html), where you can
start the MesosShuffleService with sbin/start-mesos-shuffle-service.sh
script.

The shuffle service needs to be started manually for Mesos on each slave
(one way is via Marathon with unique hostname constraint), and then you
need to enable dynamicAllocation and shuffle service flag on the driver and
it should work.

Let me know if that's not clear.

Tim

On Mon, Sep 14, 2015 at 11:36 AM, Philip Weaver 
wrote:

> Hello, I am trying to use dynamic allocation which requires the shuffle
> service. I am running Spark on mesos.
>
> Whenever I set spark.shuffle.service.enabled=true, my Spark driver fails
> with an error like this:
>
> Caused by: java.net.ConnectException: Connection refused: devspark1/
> 172.26.21.70:7337
>
> It's not clear from the documentation if the shuffle service starts
> automatically just by having it enabled, or if I need to do something else.
> There are instructions for running the shuffle service in YARN, but not
> mesos.
>
> - Philip
>
>


Re: Is it required to remove checkpoint when submitting a code change?

2015-09-14 Thread Ricardo Paiva
Hi Cody,

Thanks for your answer.

I had already tried to change the spark submit parameters, but I double
checked to reply your answer. Even changing properties file or directly on
the spark-submit arguments, none of them work when the application runs
from the checkpoint. It seems that everything is cached. I changed driver
memory, executor memory, executor cores and number of executors.

So, the scenario I have today is: once the Spark Streaming application
retrieves the data from the checkpoint, I can't change the submission
parameters neither the code parameters without remove the checkpoint
folder, loosing all the data used by windowed functions. I was wondering
what kind of parameters are you guys loading from the configuration file,
when using checkpoints.

I really appreciate all the help on this.

Many thanks,

Ricardo








On Fri, Sep 11, 2015 at 11:09 AM, Cody Koeninger  wrote:

> Yeah, it makes sense that parameters that are read only during your
> getOrCCreate function wouldn't be re-read, since that function isn't called
> if a checkpoint is loaded.
>
> I would have thought changing number of executors and other things used by
> spark-submit would work on checkpoint restart.  Have you tried both
> changing them in the properties file provided to spark submit, and the
> --arguments that correspond to number of cores / executor memory?
>
> On Thu, Sep 10, 2015 at 5:23 PM, Ricardo Luis Silva Paiva <
> ricardo.pa...@corp.globo.com> wrote:
>
>>
>> Hi guys,
>>
>> I tried to use the configuration file, but it didn't work as I expected.
>> As part of the Spark Streaming flow, my methods run only when the
>> application is started the first time. Once I restart the app, it reads
>> from the checkpoint and all the dstream operations come from the cache. No
>> parameter is reloaded.
>>
>> I would like to know if it's possible to reset the time of windowed
>> operations, checkpoint time etc. I also would like to change the submission
>> parameters, like number of executors, memory per executor or driver etc. If
>> it's not possible, what kind of parameters do you guys usually use in a
>> configuration file. I know that the streaming interval it not possible to
>> be changed.
>>
>> This is my code:
>>
>> def main(args: Array[String]): Unit = {
>>   val ssc = StreamingContext.getOrCreate(CHECKPOINT_FOLDER,
>> createSparkContext _)
>>   ssc.start()
>>   ssc.awaitTermination()
>>   ssc.stop()
>> }
>>
>> def createSparkContext(): StreamingContext = {
>>   val sparkConf = new SparkConf()
>>  .setAppName(APP_NAME)
>>  .set("spark.streaming.unpersist", "true")
>>   val ssc = new StreamingContext(sparkConf, streamingInterval)
>>   ssc.checkpoint(CHECKPOINT_FOLDER)
>>   ssc.sparkContext.addFile(CONFIG_FILENAME)
>>
>>   val rawStream = createKafkaRDD(ssc)
>>   processAndSave(rawStream)
>>   return ssc
>> }
>>
>> def processAndSave(rawStream:DStream[(String, Array[Byte])]): Unit = {
>>
>>   val configFile = SparkFiles.get("config.properties")
>>   val config:Config = ConfigFactory.parseFile(new File(configFile))
>>
>>
>> *  slidingInterval =
>> Minutes(config.getInt("streaming.sliding.interval"))  windowLength =
>> Minutes(config.getInt("streaming.window.interval"))  minPageview =
>> config.getInt("streaming.pageview.min")*
>>
>>
>>   val pageviewStream = rawStream.map{ case (_, raw) =>
>> (PageViewParser.parseURL(raw), 1L) }
>>   val pageviewsHourlyCount = 
>> stream.reduceByKeyAndWindow(PageViewAgregator.pageviewsSum
>> _,
>>
>> PageViewAgregator.pageviewsMinus _,
>>  *windowLength*,
>>
>> *slidingInterval*)
>>
>>   val permalinkAudienceStream = pageviewsHourlyCount.filter(_._2 >=
>> *minPageview*)
>>   permalinkAudienceStream.map(a => s"${a._1}\t${a._2}")
>>  .repartition(1)
>>  .saveAsTextFiles(DESTINATION_FILE, "txt")
>>
>> }
>>
>> I really appreciate any help on this.
>>
>> Many thanks,
>>
>> Ricardo
>>
>> On Thu, Sep 3, 2015 at 1:58 PM, Ricardo Luis Silva Paiva <
>> ricardo.pa...@corp.globo.com> wrote:
>>
>>> Good tip. I will try that.
>>>
>>> Thank you.
>>>
>>> On Wed, Sep 2, 2015 at 6:54 PM, Cody Koeninger 
>>> wrote:
>>>
 Yeah, in general if you're changing the jar you can't recover the
 checkpoint.

 If you're just changing parameters, why not externalize those in a
 configuration file so your jar doesn't change?  I tend to stick even my
 app-specific parameters in an external spark config so everything is in one
 place.

 On Wed, Sep 2, 2015 at 4:48 PM, Ricardo Luis Silva Paiva <
 ricardo.pa...@corp.globo.com> wrote:

> Hi,
>
> Is there a way to submit an app code change, keeping the checkpoint
> data or do I need to erase the checkpoint folder every time I re-submit 
> the
> spark app with a new jar?
>
> I have an app that count pageviews streaming from Kafka, and 

Re: JavaRDD using Reflection

2015-09-14 Thread Ajay Singal
Hello Rachana,

The easiest way would be to start with creating a 'parent' JavaRDD and run
different filters (based on different input arguments) to create respective
'child' JavaRDDs dynamically.

Notice that the creation of these children RDDs is handled by the
application driver.

Hope this helps!
Ajay

On Mon, Sep 14, 2015 at 1:21 PM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi Rachana
>
> I didn't get you r question fully but as the error says you can not
> perform a rdd transformation or action inside another transformation. In
> your example you are performing an action "rdd2.values.count()" in side
> the "map" transformation. It is not allowed and in any case this will be
> very inefficient too.
>
> you should do something like this:
>
> final long rdd2_count = rdd2.values.count()
> rdd1.map(x => rdd2_count * x)
>
> Hope this helps!!
>
> - Ankur
>
> On Mon, Sep 14, 2015 at 9:54 AM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
>> Hello all,
>>
>>
>>
>> I am working a problem that requires us to create different set of
>> JavaRDD based on different input arguments.  We are getting following error
>> when we try to use a factory to create JavaRDD.  Error message is clear but
>> I am wondering is there any workaround.
>>
>>
>>
>> *Question: *
>>
>> How to create different set of JavaRDD based on different input arguments
>> dynamically.  Trying to implement something like factory pattern.
>>
>>
>>
>> *Error Message: *
>>
>> RDD transformations and actions can only be invoked by the driver, not
>> inside of other transformations; for example, rdd1.map(x =>
>> rdd2.values.count() * x) is invalid because the values transformation and
>> count action cannot be performed inside of the rdd1.map transformation. For
>> more information, see SPARK-5063.
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Rachana
>>
>
>


Re: Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
Ah, I missed that, thanks!

On Mon, Sep 14, 2015 at 11:45 AM, Tim Chen  wrote:

> Hi Philip,
>
> I've included documentation in the Spark/Mesos doc (
> http://spark.apache.org/docs/latest/running-on-mesos.html), where you can
> start the MesosShuffleService with sbin/start-mesos-shuffle-service.sh
> script.
>
> The shuffle service needs to be started manually for Mesos on each slave
> (one way is via Marathon with unique hostname constraint), and then you
> need to enable dynamicAllocation and shuffle service flag on the driver and
> it should work.
>
> Let me know if that's not clear.
>
> Tim
>
> On Mon, Sep 14, 2015 at 11:36 AM, Philip Weaver 
> wrote:
>
>> Hello, I am trying to use dynamic allocation which requires the shuffle
>> service. I am running Spark on mesos.
>>
>> Whenever I set spark.shuffle.service.enabled=true, my Spark driver fails
>> with an error like this:
>>
>> Caused by: java.net.ConnectException: Connection refused: devspark1/
>> 172.26.21.70:7337
>>
>> It's not clear from the documentation if the shuffle service starts
>> automatically just by having it enabled, or if I need to do something else.
>> There are instructions for running the shuffle service in YARN, but not
>> mesos.
>>
>> - Philip
>>
>>
>


Spark Streaming application code change and stateful transformations

2015-09-14 Thread Ofir Kerker
Hi,
My Spark Streaming application consumes messages (events) from Kafka every
10 seconds using the direct stream approach and aggregates these messages
into hourly aggregations (to answer analytics questions like: "How many
users from Paris visited page X between 8PM to 9PM") and save the data to
Cassandra.

I was wondering if there's a good practice for handling a code change in a
Spark Streaming applications that uses stateful transformations
(updateStateByKey for example) because the new application code will not be
able to use the data that was checkpointed by the former application.
I have thought of a few solutions for this issue and was hoping some of you
have some experience with such case and can suggest other solutions or
feedback my suggested solutions:
*Solution #1*: On a graceful shutdown, in addition to the current Kafka
offsets, persist the current aggregated data into Cassandra tables
(different than the regular aggregation tables) that would allow reading
them easily when the new application starts in order to build the initial
state.
*Solution #2*: When an hour is "complete" (i.e not expecting more events
with the timestamp of this hour), update somewhere persistent (DB / shared
file) the last-complete-hour. This will allow me, when the new application
starts, to read all the events from Kafka from the beginning of retention
period (last X hours) and ignore events from timestamp smaller or equal than
the last-complete-hour.

I'll be happy to get your feedback!

Thanks,
Ofir




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming application code change and stateful transformations

2015-09-14 Thread Cody Koeninger
Solution 2 sounds better to me.  You aren't always going to have graceful
shutdowns.

On Mon, Sep 14, 2015 at 1:49 PM, Ofir Kerker  wrote:

> Hi,
> My Spark Streaming application consumes messages (events) from Kafka every
> 10 seconds using the direct stream approach and aggregates these messages
> into hourly aggregations (to answer analytics questions like: "How many
> users from Paris visited page X between 8PM to 9PM") and save the data to
> Cassandra.
>
> I was wondering if there's a good practice for handling a code change in a
> Spark Streaming applications that uses stateful transformations
> (updateStateByKey for example) because the new application code will not be
> able to use the data that was checkpointed by the former application.
> I have thought of a few solutions for this issue and was hoping some of you
> have some experience with such case and can suggest other solutions or
> feedback my suggested solutions:
> *Solution #1*: On a graceful shutdown, in addition to the current Kafka
> offsets, persist the current aggregated data into Cassandra tables
> (different than the regular aggregation tables) that would allow reading
> them easily when the new application starts in order to build the initial
> state.
> *Solution #2*: When an hour is "complete" (i.e not expecting more events
> with the timestamp of this hour), update somewhere persistent (DB / shared
> file) the last-complete-hour. This will allow me, when the new application
> starts, to read all the events from Kafka from the beginning of retention
> period (last X hours) and ignore events from timestamp smaller or equal
> than
> the last-complete-hour.
>
> I'll be happy to get your feedback!
>
> Thanks,
> Ofir
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-application-code-change-and-stateful-transformations-tp24692.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is there any Spark SQL reference manual?

2015-09-14 Thread vivek bhaskar
Thanks Richard, Ted. Hope we have some reference available soon.

Peymen, I had a look at this link before at this but was looking for
something with broader coverage.

PS: Richard, Kindly advise me for generating  BNF description of the
grammar via derby build script. Since this may not be of spark-user concern
you may please reply me privately.

Regards,
Vivek

On Fri, Sep 11, 2015 at 9:24 PM, Ted Yu  wrote:

> Very nice suggestion, Richard.
>
> I logged SPARK-10561 referencing this discussion.
>
> On Fri, Sep 11, 2015 at 8:15 AM, Richard Hillegas 
> wrote:
>
>> The latest Derby SQL Reference manual (version 10.11) can be found here:
>> https://db.apache.org/derby/docs/10.11/ref/index.html. It is, indeed,
>> very useful to have a comprehensive reference guide. The Derby build
>> scripts can also produce a BNF description of the grammar--but that is not
>> part of the public documentation for the project. The BNF is trivial to
>> generate because it is an artifact of the JavaCC grammar generator which
>> Derby uses.
>>
>> I appreciate the difficulty of maintaining a formal reference guide for a
>> rapidly evolving SQL dialect like Spark's.
>>
>> A machine-generated BNF, however, is easy to imagine. But perhaps not so
>> easy to implement. Spark's SQL grammar is implemented in Scala, extending
>> the DSL support provided by the Scala language. I am new to programming in
>> Scala, so I don't know whether the Scala ecosystem provides any good tools
>> for reverse-engineering a BNF from a class which extends
>> scala.util.parsing.combinator.syntactical.StandardTokenParsers.
>>
>> Thanks,
>> -Rick
>>
>> vivekw...@gmail.com wrote on 09/11/2015 05:05:47 AM:
>>
>> > From: vivek bhaskar 
>> > To: Ted Yu 
>> > Cc: user 
>> > Date: 09/11/2015 05:06 AM
>> > Subject: Re: Is there any Spark SQL reference manual?
>> > Sent by: vivekw...@gmail.com
>>
>> >
>> > Hi Ted,
>> >
>> > The link you mention do not have complete list of supported syntax.
>> > For example, few supported syntax are listed as "Supported Hive
>> > features" but that do not claim to be exhaustive (even if it is so,
>> > one has to filter out a lot many lines from Hive QL reference and
>> > still will not be sure if its all - due to versions mismatch).
>> >
>> > Quickly searching online gives me link for another popular open
>> > source project which has good sql reference: https://db.apache.org/
>> > derby/docs/10.1/ref/crefsqlj23296.html.
>> >
>> > I had similar expectation when I was looking for all supported DDL
>> > and DML syntax along with their extensions. For example,
>> > a. Select expression along with supported extensions i.e. where
>> > clause, group by, different supported joins etc.
>> > b. SQL format for Create, Insert, Alter table etc.
>> > c. SQL for Insert, Update, Delete, etc along with their extensions.
>> > d. Syntax for view creation, if supported
>> > e. Syntax for explain mechanism
>> > f. List of supported functions, operators, etc. I can see that 100s
>> > of function are added in 1.5 but then you have to make lot of cross
>> > check from code to JIRA tickets.
>> >
>> > So I wanted a piece of documentation that can provide all such
>> > information at a single place.
>> >
>> > Regards,
>> > Vivek
>> >
>> > On Fri, Sep 11, 2015 at 4:29 PM, Ted Yu  wrote:
>> > You may have seen this:
>> > https://spark.apache.org/docs/latest/sql-programming-guide.html
>> >
>> > Please suggest what should be added.
>> >
>> > Cheers
>> >
>> > On Fri, Sep 11, 2015 at 3:43 AM, vivek bhaskar 
>> wrote:
>> > Hi all,
>> >
>> > I am looking for a reference manual for Spark SQL some thing like
>> > many database vendors have. I could find one for hive ql https://
>> > cwiki.apache.org/confluence/display/Hive/LanguageManual but not
>> > anything specific to spark sql.
>> >
>> > Please suggest. SQL reference specific to latest release will be of
>> > great help.
>> >
>> > Regards,
>> > Vivek
>>
>>
>


Re: Spark Streaming..Exception

2015-09-14 Thread Priya Ch
Hi All,

 I came across the related old conversation on the above issue (
https://issues.apache.org/jira/browse/SPARK-5594. ) Is the issue fixed? I
tried different values for spark.cleaner.ttl  -> 0sec, -1sec,
2000sec,..none of them worked. I also tried setting
spark.streaming.unpersist -> true. What is the possible solution for this ?
Is this a bug in Spark 1.3.0? Changing the scheduling mode to Stand-alone
or Mesos mode would work fine ??

Please someone share your views on this.

On Sat, Sep 12, 2015 at 11:04 PM, Priya Ch 
wrote:

> Hello All,
>
>  When I push messages into kafka and read into streaming application, I
> see the following exception-
>  I am running the application on YARN and no where broadcasting the
> message within the application. Just simply reading message, parsing it and
> populating fields in a class and then printing the dstream (using
> DStream.print).
>
>  Have no clue if this is cluster issue or spark version issue or node
> issue. The strange part is, sometimes the message is processed but
> sometimes I see the below exception -
>
> java.io.IOException: org.apache.spark.SparkException: Failed to get
> broadcast_5_piece0 of broadcast_5
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_5_piece0 of broadcast_5
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.broadcast.TorrentBroadcast.org
> 
> $apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
>
>
> I would be glad if someone can throw some light on this.
>
> Thanks,
> Padma Ch
>
>


DAG Scheduler deadlock when two RDDs reference each other, force Stages manually?

2015-09-14 Thread petranidis
Hi all, 

I am new to spark and I have writen a few spark programs mostly around
machine learning 
applications.

I am trying to resolve a particular problem where there are two RDDs that
should be updated
by using elements of each other. More specifically, if the two pair RDDs are
called A and B M
is a matrix that specifies which elements of each RDD should be taken into
account when
computing the other with rows of M corresponding to elements of A and
columns to elements
of B e.g.

A = (0, 6), (1,7), (2,8)
B = (0, 4), (1,6), (2,1)
and
M = 
 0 1 1
 1 1 0 
0 1 0

Then

for (it =0;it < 10; it++) {
A(0) = B(1) + B(2)
A(1) = B(0) + B(1)
A(2) = B(1)
B(0) = A(1)
B(1) = A(0) + A(1) + A(2)
B(2) = A(0)
}

To do such a computation in spark, I used
A = A.map( (key,val) => { B.aggregate(...) }) 
B = B.map( (key,val) => { A.aggregate(...) }) 

where if the key of each mapped element keyA is passed in the aggregate
function as a 
initialization parameter and then for each B element key keyB, if M(keyA,
keyB) ==1
then the B element is being taken into account in the summation.

The calculation of A is done successfully and correctly, but then the DAG
scheduler
seems to deadlock when the calculation of B happens. This behaviour goes
away
when I remove the A.aggregate bit in my code. Apparently according to the
logs the 
scheduler is expecting some results before if can go on but the results
should already
have been calculated.

I assume that this has to do with the DAG scheduling not handling cyclical
dependencies.
Is there a way I can force each iteration or update of A and B to be seen as
a separate
stage? Otherwise, how can I implement this type of aggregation in another
way? (It could
be the equivalent of mapping the A elements to a List of all the B elements
for which the M
matrix entry is 1 and then mapping again to their sum, but this means I need
a lot of space 
especially when the problem in hand could be very large, which is
unfeasible, so I need to avoid this)

Thanks in advance for your help!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming Topology

2015-09-14 Thread defstat
Hi all, 

I would like to use Spark Streaming for managing the problem below:

I have 2 InputStreams, one for one type of input (n-dimensional vectors) and
one for question on the infrastructure (explained below).

I need to "break" the input first in 4 execution nodes, and produce a stream
from each (4 streams) containing only m vectors that share common attributes
(what is "common" comes from comparing incoming vectors with vectors already
inside the "selected" group). If the selected group of m vectors changes by
the addition of an new vector, I need to forward the selected group to an
"aggregation node" that joins 2 execution nodes' selected groups (we have 2
such aggregation nodes). Following that, if an aggregation node has a change
in its joined selected group of vectors, it forwards its joined selected
vectors to an other aggregate node that contains the overall aggregation of
the 2 aggregation nodes (JoinAll). 

Now, if a "question" arrives, a mapToPair and then Sort transformations need
to be done on JoinAll, and print the result (one result for each question)

Can anyone help me on this endeavour? 

I think, and correct me if I am mistaken, the architecture described needs
to:

1. Partition Input Stream(1) to through  DStream eachPartition =
inputVectors.repartition(4)
2. Filter() eachPartition like DStream eachPartitionFiltered =
eachPartition.filter(func) -> How can I use already persisted to node data
for that [I mean the already "selected group" of that specific partition]
3. Group 2 out of 4 partitions to another DStream if needed and store it
inside another node. [???]
4. if a "question" arrives, use the JoinAll DStream for answering the
question. [???] 

Thank you in advance.
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Topology-tp24685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DAG Scheduler deadlock when two RDDs reference each other, force Stages manually?

2015-09-14 Thread Petros Nyfantis

Hi Sean

thanks a lot for your reply, yes I understand
that as scala is a functional language maps
correspond to transforms of immutable objects
but the behavior of the program seems like
a deadlock as it simply does not proceed beyond
the B = B.map (A.aggregate) stage

my Spark Web interface shows a pure
scheduler delay bar when I click one
of the still active jobs and expand the
event timeline.

A snippet of thread dump follows my
message, where threads that correspond
to my code calls appear and they are
all in WAITING. like I said, when I remove
the second map and nested aggregate
the problem vanishes.

Thanks again,
Petros

2015-09-14 20:00:53
Full thread dump OpenJDK 64-Bit Server VM (24.79-b02 mixed mode):

"Attach Listener" daemon prio=10 tid=0x7f9aa4001000 nid=0x674c 
waiting on condition [0x]

   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
- None

"qtp1523343281-98" daemon prio=10 tid=0x7f9a2c001000 nid=0x6675 
waiting on condition [0x7f9ab48a]

   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xe0389d80> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at 
org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None

"qtp1523343281-97" daemon prio=10 tid=0x7f9a28002000 nid=0x6649 
waiting on condition [0x7f99f7baa000]

   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0xe0389d80> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at 
org.eclipse.jetty.util.BlockingArrayQueue.poll(BlockingArrayQueue.java:342)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.idleJobPoll(QueuedThreadPool.java:526)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.access$600(QueuedThreadPool.java:44)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None

"Executor task launch worker-7" daemon prio=10 tid=0x7f9a8400a800 
nid=0x661a in Object.wait() [0x7f99f6e9c000]

   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xfc9ba760> (a 
org.apache.spark.scheduler.JobWaiter)

at java.lang.Object.wait(Object.java:503)
at 
org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
- locked <0xfc9ba760> (a 
org.apache.spark.scheduler.JobWaiter)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:530)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1734)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1804)
at 
org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1058)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1051)
at 
DistributedCholesky$$anonfun$compute_sigma_A$1.apply(DistributedCholesky.scala:323)
at 
DistributedCholesky$$anonfun$compute_sigma_A$1.apply(DistributedCholesky.scala:321)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)

at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)

at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
at 
org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1056)
at