Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-25 Thread Dibyendu Bhattacharya
Hi,

Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
Streaming and make sure Spark Streaming can recover from Driver failure and
recover the blocks form Tachyon.

The The Motivation for this PR is  :

If Streaming application stores the blocks OFF_HEAP, it may not need any
WAL like feature to recover from Driver failure. As long as the writing of
blocks to Tachyon from Streaming receiver is durable, it should be
recoverable from Tachyon directly on Driver failure.
This can solve the issue of expensive WAL write and duplicating the blocks
both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
channel using OFF_HEAP store.

https://github.com/apache/spark/pull/8817

This PR still under review . But having done various fail over testing in
my environment , I see this PR worked perfectly fine without any data loss
. Let see what TD and other have to say on this PR .

Below is the configuration I used to test this PR ..


Spark : 1.6 from Master
Tachyon : 0.7.1

SparkConfiguration Details :

SparkConf conf = new SparkConf().setAppName("TestTachyon")
.set("spark.streaming.unpersist", "true")
.set("spark.local.dir", "/mnt1/spark/tincan")
.set("tachyon.zookeeper.address","10.252.5.113:2182")
.set("tachyon.usezookeeper","true")
.set("spark.externalBlockStore.url", "tachyon-ft://
ip-10-252-5-113.asskickery.us:19998")
.set("spark.externalBlockStore.baseDir", "/sparkstreaming")
.set("spark.externalBlockStore.folderName","pearson")
.set("spark.externalBlockStore.dirId", "subpub")

.set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");

JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
1));

String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark/wal
";

jsc.checkpoint(checkpointDirectory);


//I am using the My Receiver Based Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) . But
KafkaUtil.CreateStream will also work

JavaDStream unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());




Regards,
Dibyendu

On Sat, Sep 26, 2015 at 11:59 AM, N B  wrote:

> Hi Dibyendu,
>
> How does one go about configuring spark streaming to use tachyon as its
> place for storing checkpoints? Also, can one do this with tachyon running
> on a completely different node than where spark processes are running?
>
> Thanks
> Nikunj
>
>
> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for looking into this. Further investigating I found that the
>> issue is with Tachyon does not support File Append. The streaming receiver
>> which writes to WAL when failed, and again restarted, not able to append to
>> same WAL file after restart.
>>
>> I raised this with Tachyon user group, and Haoyuan told that within 3
>> months time Tachyon file append will be ready. Will revisit this issue
>> again then .
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das 
>> wrote:
>>
>>> Looks like somehow the file size reported by the FSInputDStream of
>>> Tachyon's FileSystem interface, is returning zero.
>>>
>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon
 as OFF_HEAP block store. As I said in earlier email, I could able to solve
 the BlockNotFound exception when I used Hierarchical Storage of
 Tachyon ,  which is good.

 I continue doing some testing around storing the Spark Streaming WAL
 and CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon "receivedData" location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I
 raised a JIRA for the same issue :
 https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
 (epoch 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
 successfully in removeExecutor
 INFO 

Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-25 Thread N B
Hi Dibyendu,

How does one go about configuring spark streaming to use tachyon as its
place for storing checkpoints? Also, can one do this with tachyon running
on a completely different node than where spark processes are running?

Thanks
Nikunj


On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi Tathagata,
>
> Thanks for looking into this. Further investigating I found that the issue
> is with Tachyon does not support File Append. The streaming receiver which
> writes to WAL when failed, and again restarted, not able to append to same
> WAL file after restart.
>
> I raised this with Tachyon user group, and Haoyuan told that within 3
> months time Tachyon file append will be ready. Will revisit this issue
> again then .
>
> Regards,
> Dibyendu
>
>
> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das 
> wrote:
>
>> Looks like somehow the file size reported by the FSInputDStream of
>> Tachyon's FileSystem interface, is returning zero.
>>
>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Just to follow up this thread further .
>>>
>>> I was doing some fault tolerant testing of Spark Streaming with Tachyon
>>> as OFF_HEAP block store. As I said in earlier email, I could able to solve
>>> the BlockNotFound exception when I used Hierarchical Storage of Tachyon
>>> ,  which is good.
>>>
>>> I continue doing some testing around storing the Spark Streaming WAL and
>>> CheckPoint files also in Tachyon . Here is few finding ..
>>>
>>>
>>> When I store the Spark Streaming Checkpoint location in Tachyon , the
>>> throughput is much higher . I tested the Driver and Receiver failure cases
>>> , and Spark Streaming is able to recover without any Data Loss on Driver
>>> failure.
>>>
>>> *But on Receiver failure , Spark Streaming looses data* as I see
>>> Exception while reading the WAL file from Tachyon "receivedData" location
>>>  for the same Receiver id which just failed.
>>>
>>> If I change the Checkpoint location back to HDFS , Spark Streaming can
>>> recover from both Driver and Receiver failure .
>>>
>>> Here is the Log details when Spark Streaming receiver failed ...I raised
>>> a JIRA for the same issue :
>>> https://issues.apache.org/jira/browse/SPARK-7525
>>>
>>>
>>>
>>> INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
>>> (epoch 1)*
>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
>>> remove executor 2 from BlockManagerMaster.
>>> INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
>>> block manager BlockManagerId(2, 10.252.5.54, 45789)
>>> INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2
>>> successfully in removeExecutor
>>> INFO : org.apache.spark.streaming.scheduler.ReceiverTracker - *Registered
>>> receiver for stream 2 from 10.252.5.62*:47255
>>> WARN : org.apache.spark.scheduler.TaskSetManager - Lost task 2.1 in
>>> stage 103.0 (TID 421, 10.252.5.62): org.apache.spark.SparkException: *Could
>>> not read data from write ahead log record
>>> FileBasedWriteAheadLogSegment(tachyon-ft://10.252.5.113:19998/tachyon/checkpoint/receivedData/2/log-1431341091711-1431341151711,645603894,10891919
>>> )*
>>> at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:168)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:168)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:744)
>>> Caused by: java.lang.IllegalArgumentException:* Seek position is past
>>> EOF: 645603894, fileSize = 0*
>>> at tachyon.hadoop.HdfsFileInputStream.seek(HdfsFileInputStream.java:239)
>>> at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:37)
>>> at

Re: How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-25 Thread Gavin Yue
It is working, We are doing the same thing everyday.  But the remote server
needs to able to talk with ResourceManager.

If you are using Spark-submit,  your will also specify the hadoop conf
directory in your Env variable. Spark would rely on that to locate where
the cluster's resource manager is.

I think this tutorial is pretty clear:
http://spark.apache.org/docs/latest/running-on-yarn.html



On Fri, Sep 25, 2015 at 7:11 PM, Zhiliang Zhu  wrote:

> Hi Yue,
>
> Thanks very much for your kind reply.
>
> I would like to submit spark job remotely on another machine outside the
> cluster,
> and the job will run on yarn, similar as hadoop job is already done, could
> you
> confirm it could exactly work for spark...
>
> Do you mean that I would print those variables on linux command side?
>
> Best Regards,
> Zhiliang
>
>
>
>
>
> On Saturday, September 26, 2015 10:07 AM, Gavin Yue <
> yue.yuany...@gmail.com> wrote:
>
>
> Print out your env variables and check first
>
> Sent from my iPhone
>
> On Sep 25, 2015, at 18:43, Zhiliang Zhu  > wrote:
>
> Hi All,
>
> I would like to submit spark job on some another remote machine outside
> the cluster,
> I also copied hadoop/spark conf files under the remote machine, then hadoop
> job would be submitted, but spark job would not.
>
> In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,
> or for some other reasons...
>
> This issue is urgent for me, would some expert provide some help about
> this problem...
>
> I will show sincere appreciation towards your help.
>
> Thank you!
> Best Regards,
> Zhiliang
>
>
>
>
> On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu <
> zchl.j...@yahoo.com.INVALID > wrote:
>
>
> Hi all,
>
> The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or
> just set as
> export  SPARK_LOCAL_IP=localhost#or set as the specific node ip on the
> specific spark install directory
>
> It will work well to submit spark job on master node of cluster, however,
> it will fail by way of some gateway machine remotely.
>
> The gateway machine is already configed, it works well to submit hadoop
> job.
> It is set as:
> export SCALA_HOME=/usr/lib/scala
> export JAVA_HOME=/usr/java/jdk1.7.0_45
> export R_HOME=/usr/lib/r
> export HADOOP_HOME=/usr/lib/hadoop
> export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
>
> export SPARK_MASTER_IP=master01
> #export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set,
> SparkContext will not start
> export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is
> started, but failed later
> export SPARK_LOCAL_DIRS=/data/spark_local_dir
> ...
>
> The error messages:
> 15/09/25 19:07:12 INFO util.Utils: Successfully started service
> 'sparkYarnAM' on port 48133.
> 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to
> be reachable.
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
>
>  I shall sincerely appreciate your kind help very much!
> Zhiliang
>
>
>
>
>
>
>


Re: Networking issues with Spark on EC2

2015-09-25 Thread SURAJ SHETH
Hi,
Nopes. I was trying to use EC2(due to a few constraints) where I faced the
problem.
With EMR, it works flawlessly.
But, I would like to go back and use EC2 if I can fix this issue.
Has anybody set up a spark cluster using plain EC2 machines. What steps did
you follow?

Thanks and Regards,
Suraj Sheth

On Sat, Sep 26, 2015 at 10:36 AM, Natu Lauchande 
wrote:

> Hi,
>
> Are you using EMR ?
>
> Natu
>
> On Sat, Sep 26, 2015 at 6:55 AM, SURAJ SHETH  wrote:
>
>> Hi Ankur,
>> Thanks for the reply.
>> This is already done.
>> If I wait for a long amount of time(10 minutes), a few tasks get
>> successful even on slave nodes. Sometime, a fraction of the tasks(20%) are
>> completed on all the machines in the initial 5 seconds and then, it slows
>> down drastically.
>>
>> Thanks and Regards,
>> Suraj Sheth
>>
>> On Fri, Sep 25, 2015 at 2:10 AM, Ankur Srivastava <
>> ankur.srivast...@gmail.com> wrote:
>>
>>> Hi Suraj,
>>>
>>> Spark uses a lot of ports to communicate between nodes. Probably your
>>> security group is restrictive and does not allow instances to communicate
>>> on all networks. The easiest way to resolve it is to add a Rule to allow
>>> all Inbound traffic on all ports (0-65535) to instances in same
>>> security group like this.
>>>
>>> All TCP
>>> TCP
>>> 0 - 65535
>>>  your security group
>>>
>>> Hope this helps!!
>>>
>>> Thanks
>>> Ankur
>>>
>>> On Thu, Sep 24, 2015 at 7:09 AM SURAJ SHETH  wrote:
>>>
 Hi,

 I am using Spark 1.2 and facing network related issues while performing
 simple computations.

 This is a custom cluster set up using ec2 machines and spark prebuilt
 binary from apache site. The problem is only when we have workers on other
 machines(networking involved). Having a single node for the master and the
 slave works correctly.

 The error log from slave node is attached below. It is reading textFile
 from local FS(copied each node) and counting it. The first 30 tasks get
 completed within 5 seconds. Then, it takes several minutes to complete
 another 10 tasks and eventually dies.

 Sometimes, one of the workers completes all the tasks assigned to it.
 Different workers have different behavior at different
 times(non-deterministic).

 Is it related to something specific to EC2?



 15/09/24 13:04:40 INFO Executor: Running task 117.0 in stage 0.0 (TID
 117)

 15/09/24 13:04:41 INFO TorrentBroadcast: Started reading broadcast
 variable 1

 15/09/24 13:04:41 INFO SendingConnection: Initiating connection to
 [master_ip:56305]

 15/09/24 13:04:41 INFO SendingConnection: Connected to
 [master_ip/master_ip_address:56305], 1 messages pending

 15/09/24 13:05:41 INFO TorrentBroadcast: Started reading broadcast
 variable 1

 15/09/24 13:05:41 ERROR Executor: Exception in task 77.0 in stage 0.0
 (TID 77)

 java.io.IOException: sendMessageReliably failed because ack was not
 received within 60 sec

 at
 org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)

 at
 org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)

 at
 io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)

 at
 io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)

 at
 io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)

 at java.lang.Thread.run(Thread.java:745)

 15/09/24 13:05:41 INFO CoarseGrainedExecutorBackend: Got assigned task
 122

 15/09/24 13:05:41 INFO Executor: Running task 3.1 in stage 0.0 (TID 122)

 15/09/24 13:06:41 ERROR Executor: Exception in task 113.0 in stage 0.0
 (TID 113)

 java.io.IOException: sendMessageReliably failed because ack was not
 received within 60 sec

 at
 org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)

 at
 org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)

 at scala.Option.foreach(Option.scala:236)

 at
 org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)

 at
 io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)

 at
 io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)

 at
 io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)

>

Re: Networking issues with Spark on EC2

2015-09-25 Thread Natu Lauchande
Hi,

Are you using EMR ?

Natu

On Sat, Sep 26, 2015 at 6:55 AM, SURAJ SHETH  wrote:

> Hi Ankur,
> Thanks for the reply.
> This is already done.
> If I wait for a long amount of time(10 minutes), a few tasks get
> successful even on slave nodes. Sometime, a fraction of the tasks(20%) are
> completed on all the machines in the initial 5 seconds and then, it slows
> down drastically.
>
> Thanks and Regards,
> Suraj Sheth
>
> On Fri, Sep 25, 2015 at 2:10 AM, Ankur Srivastava <
> ankur.srivast...@gmail.com> wrote:
>
>> Hi Suraj,
>>
>> Spark uses a lot of ports to communicate between nodes. Probably your
>> security group is restrictive and does not allow instances to communicate
>> on all networks. The easiest way to resolve it is to add a Rule to allow
>> all Inbound traffic on all ports (0-65535) to instances in same security
>> group like this.
>>
>> All TCP
>> TCP
>> 0 - 65535
>>  your security group
>>
>> Hope this helps!!
>>
>> Thanks
>> Ankur
>>
>> On Thu, Sep 24, 2015 at 7:09 AM SURAJ SHETH  wrote:
>>
>>> Hi,
>>>
>>> I am using Spark 1.2 and facing network related issues while performing
>>> simple computations.
>>>
>>> This is a custom cluster set up using ec2 machines and spark prebuilt
>>> binary from apache site. The problem is only when we have workers on other
>>> machines(networking involved). Having a single node for the master and the
>>> slave works correctly.
>>>
>>> The error log from slave node is attached below. It is reading textFile
>>> from local FS(copied each node) and counting it. The first 30 tasks get
>>> completed within 5 seconds. Then, it takes several minutes to complete
>>> another 10 tasks and eventually dies.
>>>
>>> Sometimes, one of the workers completes all the tasks assigned to it.
>>> Different workers have different behavior at different
>>> times(non-deterministic).
>>>
>>> Is it related to something specific to EC2?
>>>
>>>
>>>
>>> 15/09/24 13:04:40 INFO Executor: Running task 117.0 in stage 0.0 (TID
>>> 117)
>>>
>>> 15/09/24 13:04:41 INFO TorrentBroadcast: Started reading broadcast
>>> variable 1
>>>
>>> 15/09/24 13:04:41 INFO SendingConnection: Initiating connection to
>>> [master_ip:56305]
>>>
>>> 15/09/24 13:04:41 INFO SendingConnection: Connected to
>>> [master_ip/master_ip_address:56305], 1 messages pending
>>>
>>> 15/09/24 13:05:41 INFO TorrentBroadcast: Started reading broadcast
>>> variable 1
>>>
>>> 15/09/24 13:05:41 ERROR Executor: Exception in task 77.0 in stage 0.0
>>> (TID 77)
>>>
>>> java.io.IOException: sendMessageReliably failed because ack was not
>>> received within 60 sec
>>>
>>> at
>>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>>>
>>> at
>>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)
>>>
>>> at scala.Option.foreach(Option.scala:236)
>>>
>>> at
>>> org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)
>>>
>>> at
>>> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>>>
>>> at
>>> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>>>
>>> at
>>> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 15/09/24 13:05:41 INFO CoarseGrainedExecutorBackend: Got assigned task
>>> 122
>>>
>>> 15/09/24 13:05:41 INFO Executor: Running task 3.1 in stage 0.0 (TID 122)
>>>
>>> 15/09/24 13:06:41 ERROR Executor: Exception in task 113.0 in stage 0.0
>>> (TID 113)
>>>
>>> java.io.IOException: sendMessageReliably failed because ack was not
>>> received within 60 sec
>>>
>>> at
>>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>>>
>>> at
>>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)
>>>
>>> at scala.Option.foreach(Option.scala:236)
>>>
>>> at
>>> org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)
>>>
>>> at
>>> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>>>
>>> at
>>> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>>>
>>> at
>>> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> 15/09/24 13:06:41 INFO TorrentBroadcast: Started reading broadcast
>>> variable 1
>>>
>>> 15/09/24 13:06:41 INFO SendingConnection: Initiating connection to
>>> [master_ip/master_ip_address:44427]
>>>
>>> 15/09/24 13:06:41 INFO SendingConnection: Connected to
>>> [master_ip/master_ip_address:44427], 1 messages pending
>>>
>>> 15/09/24 13:07:41 ERROR Executor: Exception in task 37.0 in stage 0.0
>>> (TID 37)
>>>
>>> java.io.IOException: sendMess

Re: Networking issues with Spark on EC2

2015-09-25 Thread SURAJ SHETH
Hi Ankur,
Thanks for the reply.
This is already done.
If I wait for a long amount of time(10 minutes), a few tasks get successful
even on slave nodes. Sometime, a fraction of the tasks(20%) are completed
on all the machines in the initial 5 seconds and then, it slows down
drastically.

Thanks and Regards,
Suraj Sheth

On Fri, Sep 25, 2015 at 2:10 AM, Ankur Srivastava <
ankur.srivast...@gmail.com> wrote:

> Hi Suraj,
>
> Spark uses a lot of ports to communicate between nodes. Probably your
> security group is restrictive and does not allow instances to communicate
> on all networks. The easiest way to resolve it is to add a Rule to allow
> all Inbound traffic on all ports (0-65535) to instances in same security
> group like this.
>
> All TCP
> TCP
> 0 - 65535
>  your security group
>
> Hope this helps!!
>
> Thanks
> Ankur
>
> On Thu, Sep 24, 2015 at 7:09 AM SURAJ SHETH  wrote:
>
>> Hi,
>>
>> I am using Spark 1.2 and facing network related issues while performing
>> simple computations.
>>
>> This is a custom cluster set up using ec2 machines and spark prebuilt
>> binary from apache site. The problem is only when we have workers on other
>> machines(networking involved). Having a single node for the master and the
>> slave works correctly.
>>
>> The error log from slave node is attached below. It is reading textFile
>> from local FS(copied each node) and counting it. The first 30 tasks get
>> completed within 5 seconds. Then, it takes several minutes to complete
>> another 10 tasks and eventually dies.
>>
>> Sometimes, one of the workers completes all the tasks assigned to it.
>> Different workers have different behavior at different
>> times(non-deterministic).
>>
>> Is it related to something specific to EC2?
>>
>>
>>
>> 15/09/24 13:04:40 INFO Executor: Running task 117.0 in stage 0.0 (TID 117)
>>
>> 15/09/24 13:04:41 INFO TorrentBroadcast: Started reading broadcast
>> variable 1
>>
>> 15/09/24 13:04:41 INFO SendingConnection: Initiating connection to
>> [master_ip:56305]
>>
>> 15/09/24 13:04:41 INFO SendingConnection: Connected to
>> [master_ip/master_ip_address:56305], 1 messages pending
>>
>> 15/09/24 13:05:41 INFO TorrentBroadcast: Started reading broadcast
>> variable 1
>>
>> 15/09/24 13:05:41 ERROR Executor: Exception in task 77.0 in stage 0.0
>> (TID 77)
>>
>> java.io.IOException: sendMessageReliably failed because ack was not
>> received within 60 sec
>>
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>>
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)
>>
>> at scala.Option.foreach(Option.scala:236)
>>
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)
>>
>> at
>> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>>
>> at
>> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>>
>> at
>> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 15/09/24 13:05:41 INFO CoarseGrainedExecutorBackend: Got assigned task 122
>>
>> 15/09/24 13:05:41 INFO Executor: Running task 3.1 in stage 0.0 (TID 122)
>>
>> 15/09/24 13:06:41 ERROR Executor: Exception in task 113.0 in stage 0.0
>> (TID 113)
>>
>> java.io.IOException: sendMessageReliably failed because ack was not
>> received within 60 sec
>>
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>>
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:917)
>>
>> at scala.Option.foreach(Option.scala:236)
>>
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anon$13.run(ConnectionManager.scala:917)
>>
>> at
>> io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581)
>>
>> at
>> io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656)
>>
>> at
>> io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 15/09/24 13:06:41 INFO TorrentBroadcast: Started reading broadcast
>> variable 1
>>
>> 15/09/24 13:06:41 INFO SendingConnection: Initiating connection to
>> [master_ip/master_ip_address:44427]
>>
>> 15/09/24 13:06:41 INFO SendingConnection: Connected to
>> [master_ip/master_ip_address:44427], 1 messages pending
>>
>> 15/09/24 13:07:41 ERROR Executor: Exception in task 37.0 in stage 0.0
>> (TID 37)
>>
>> java.io.IOException: sendMessageReliably failed because ack was not
>> received within 60 sec
>>
>> at
>> org.apache.spark.network.nio.ConnectionManager$$anon$13$$anonfun$run$19.apply(ConnectionManager.scala:918)
>>
>> at
>> org.apache.spark.network.nio.Connecti

What is this Input Size in Spark Application Detail UI?

2015-09-25 Thread Chirag Dewan
Hi All,

I was wondering what does the Input Size in Application UI mean?

For my 3 node Cassandra Cluster, with 3 node Spark Cluster this size is 32GB.

For my 15 node Cassandra Cluster, with 15 node Spark Cluster this size reaches 
172GB.

Though the data in both clusters is about same volume.

Confused.

Thanks in advance.

Chirag


Re: Weird worker usage

2015-09-25 Thread N B
Bryan,

By any chance, are you calling SparkConf.setMaster("local[*]") inside your
application code?

Nikunj

On Fri, Sep 25, 2015 at 9:56 AM, Bryan Jeffrey 
wrote:

> Looking at this further, it appears that my Spark Context is not correctly
> setting the Master name.  I see the following in logs:
>
> 15/09/25 16:45:42 INFO DriverRunner: Launch Command:
> "/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
> "/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
> "-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
> "-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
> Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
> "-Dspark.driver.supervise=true" "-Dspark.logConf=true"
> "-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
> "-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
> "org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
> sparkWorker@10.0.0.6:48077/user/Worker"
> "/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
> "MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
> "kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
> "--threadParallelism" "9"
> 15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
> 15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
> 15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(root); users
> with modify permissions: Set(root)
> 15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
> 15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on
> port 59670.
> 15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
> sparkWorker@10.0.0.6:48077/user/Worker
> 15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
> 15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to akka.tcp://
> sparkWorker@10.0.0.6:48077/user/Worker
> 15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
> /tmp/sparkcheckpoint does not exist
> 15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
> configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 ms
> 15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
> 15/09/25 16:45:45 INFO SparkContext: Spark configuration:
> spark.app.name=MainClass
> spark.default.parallelism=6
> spark.driver.supervise=true
> spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
> spark.logConf=true
> spark.master=local[*]
> spark.rpc.askTimeout=10
> spark.streaming.receiver.maxRate=500
>
> As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
> context still registers the master as local[*].  Any idea why?
>
> Thank you,
>
> Bryan Jeffrey
>
>
>


Re: How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-25 Thread Alexis Gillain
I think gavin want you to print the env variables from the remote machine.

Can you provide the spark-submit command line.

Are you able to run the repl from the remote machine ?
./bin/spark-shell --master yarn-client

2015-09-26 10:11 GMT+08:00 Zhiliang Zhu :

> Hi Yue,
>
> Thanks very much for your kind reply.
>
> I would like to submit spark job remotely on another machine outside the
> cluster,
> and the job will run on yarn, similar as hadoop job is already done, could
> you
> confirm it could exactly work for spark...
>
> Do you mean that I would print those variables on linux command side?
>
> Best Regards,
> Zhiliang
>
>
>
>
>
> On Saturday, September 26, 2015 10:07 AM, Gavin Yue <
> yue.yuany...@gmail.com> wrote:
>
>
> Print out your env variables and check first
>
> Sent from my iPhone
>
> On Sep 25, 2015, at 18:43, Zhiliang Zhu  > wrote:
>
> Hi All,
>
> I would like to submit spark job on some another remote machine outside
> the cluster,
> I also copied hadoop/spark conf files under the remote machine, then hadoop
> job would be submitted, but spark job would not.
>
> In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,
> or for some other reasons...
>
> This issue is urgent for me, would some expert provide some help about
> this problem...
>
> I will show sincere appreciation towards your help.
>
> Thank you!
> Best Regards,
> Zhiliang
>
>
>
>
> On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu <
> zchl.j...@yahoo.com.INVALID > wrote:
>
>
> Hi all,
>
> The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or
> just set as
> export  SPARK_LOCAL_IP=localhost#or set as the specific node ip on the
> specific spark install directory
>
> It will work well to submit spark job on master node of cluster, however,
> it will fail by way of some gateway machine remotely.
>
> The gateway machine is already configed, it works well to submit hadoop
> job.
> It is set as:
> export SCALA_HOME=/usr/lib/scala
> export JAVA_HOME=/usr/java/jdk1.7.0_45
> export R_HOME=/usr/lib/r
> export HADOOP_HOME=/usr/lib/hadoop
> export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
>
> export SPARK_MASTER_IP=master01
> #export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set,
> SparkContext will not start
> export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is
> started, but failed later
> export SPARK_LOCAL_DIRS=/data/spark_local_dir
> ...
>
> The error messages:
> 15/09/25 19:07:12 INFO util.Utils: Successfully started service
> 'sparkYarnAM' on port 48133.
> 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to
> be reachable.
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to
> driver at 127.0.0.1:35706, retrying ...
>
>  I shall sincerely appreciate your kind help very much!
> Zhiliang
>
>
>
>
>
>
>


-- 
Alexis GILLAIN


Re: How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-25 Thread Zhiliang Zhu
Hi Yue,
Thanks very much for your kind reply.
I would like to submit spark job remotely on another machine outside the 
cluster,and the job will run on yarn, similar as hadoop job is already done, 
could youconfirm it could exactly work for spark...
Do you mean that I would print those variables on linux command side?
Best Regards,Zhiliang

 


 On Saturday, September 26, 2015 10:07 AM, Gavin Yue 
 wrote:
   

 Print out your env variables and check first 

Sent from my iPhone
On Sep 25, 2015, at 18:43, Zhiliang Zhu  wrote:


Hi All,
I would like to submit spark job on some another remote machine outside the 
cluster,I also copied hadoop/spark conf files under the remote machine, then 
hadoopjob would be submitted, but spark job would not.
In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or 
for some other reasons...
This issue is urgent for me, would some expert provide some help about this 
problem...
I will show sincere appreciation towards your help.
Thank you!Best Regards,Zhiliang



 On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu 
 wrote:
   

 Hi all,
The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just 
set asexport  SPARK_LOCAL_IP=localhost    #or set as the specific node ip on 
the specific spark install directory 

It will work well to submit spark job on master node of cluster, however, it 
will fail by way of some gateway machine remotely.
The gateway machine is already configed, it works well to submit hadoop job.It 
is set as:
export SCALA_HOME=/usr/lib/scala
export JAVA_HOME=/usr/java/jdk1.7.0_45
export R_HOME=/usr/lib/r
export HADOOP_HOME=/usr/lib/hadoop
export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

export SPARK_MASTER_IP=master01
#export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set, SparkContext 
will not start
export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is 
started, but failed later
export SPARK_LOCAL_DIRS=/data/spark_local_dir
...

The error messages:
15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' 
on port 48133.
15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be 
reachable.
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...

 I shall sincerely appreciate your kind help very much!Zhiliang



   


  

Re: How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-25 Thread Gavin Yue
Print out your env variables and check first 

Sent from my iPhone

> On Sep 25, 2015, at 18:43, Zhiliang Zhu  wrote:
> 
> Hi All,
> 
> I would like to submit spark job on some another remote machine outside the 
> cluster,
> I also copied hadoop/spark conf files under the remote machine, then hadoop
> job would be submitted, but spark job would not.
> 
> In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,
> or for some other reasons...
> 
> This issue is urgent for me, would some expert provide some help about this 
> problem...
> 
> I will show sincere appreciation towards your help.
> 
> Thank you!
> Best Regards,
> Zhiliang
> 
> 
> 
> 
> On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu 
>  wrote:
> 
> 
> Hi all,
> 
> The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or 
> just set as
> export  SPARK_LOCAL_IP=localhost#or set as the specific node ip on the 
> specific spark install directory 
> 
> It will work well to submit spark job on master node of cluster, however, it 
> will fail by way of some gateway machine remotely.
> 
> The gateway machine is already configed, it works well to submit hadoop job.
> It is set as:
> export SCALA_HOME=/usr/lib/scala
> export JAVA_HOME=/usr/java/jdk1.7.0_45
> export R_HOME=/usr/lib/r
> export HADOOP_HOME=/usr/lib/hadoop
> export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
> export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
> 
> export SPARK_MASTER_IP=master01
> #export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set, SparkContext 
> will not start
> export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is 
> started, but failed later
> export SPARK_LOCAL_DIRS=/data/spark_local_dir
> ...
> 
> The error messages:
> 15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' 
> on port 48133.
> 15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be 
> reachable.
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver 
> at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver 
> at 127.0.0.1:35706, retrying ...
> 15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver 
> at 127.0.0.1:35706, retrying ...
> 
>  I shall sincerely appreciate your kind help very much!
> Zhiliang
> 
> 
> 
> 


How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-25 Thread Zhiliang Zhu
Hi All,
I would like to submit spark job on some another remote machine outside the 
cluster,I also copied hadoop/spark conf files under the remote machine, then 
hadoopjob would be submitted, but spark job would not.
In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or 
for some other reasons...
This issue is urgent for me, would some expert provide some help about this 
problem...
I will show sincere appreciation towards your help.
Thank you!Best Regards,Zhiliang

How to properly set conf/spark-env.sh for spark to run on yarn

2015-09-25 Thread Zhiliang Zhu
Hi All,
I would like to submit spark job on some another remote machine outside the 
cluster,I also copied hadoop/spark conf files under the remote machine, then 
hadoopjob would be submitted, but spark job would not.
In spark-env.sh, it may be due to that SPARK_LOCAL_IP is not properly set,or 
for some other reasons...
This issue is urgent for me, would some expert provide some help about this 
problem...
I will show sincere appreciation towards your help.
Thank you!Best Regards,Zhiliang



 On Friday, September 25, 2015 7:53 PM, Zhiliang Zhu 
 wrote:
   

 Hi all,
The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just 
set asexport  SPARK_LOCAL_IP=localhost    #or set as the specific node ip on 
the specific spark install directory 

It will work well to submit spark job on master node of cluster, however, it 
will fail by way of some gateway machine remotely.
The gateway machine is already configed, it works well to submit hadoop job.It 
is set as:
export SCALA_HOME=/usr/lib/scala
export JAVA_HOME=/usr/java/jdk1.7.0_45
export R_HOME=/usr/lib/r
export HADOOP_HOME=/usr/lib/hadoop
export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

export SPARK_MASTER_IP=master01
#export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set, SparkContext 
will not start
export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is 
started, but failed later
export SPARK_LOCAL_DIRS=/data/spark_local_dir
...

The error messages:
15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' 
on port 48133.
15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be 
reachable.
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...

 I shall sincerely appreciate your kind help very much!Zhiliang



  

Re: Spark for Oracle sample code

2015-09-25 Thread Michael Armbrust
In most cases predicates that you add to jdbcDF will be push down into
oracle, preventing the whole table from being sent over.

df.where("column = 1")

Another common pattern is to save the table to parquet or something for
repeat querying.

Michael

On Fri, Sep 25, 2015 at 3:13 PM, Cui Lin  wrote:

>
>
> Hello, All,
>
> I found the examples for JDBC connection are mostly read the whole table
> and then do operations like joining.
>
> val jdbcDF = sqlContext.read.format("jdbc").options(
>   Map("url" -> "jdbc:postgresql:dbserver",
>   "dbtable" -> "schema.tablename")).load()
>
>
> Sometimes it is not practical since the whole table data is too big and
> not necessary.
>
> What makes sense to me is to use sparksql to get subset data from oracle
> tables using sql-like statement.
> I couldn't find such examples. Can someone show me?
>
>
>
> --
> Best regards!
>
> Lin,Cui
>
>
>
> --
> Best regards!
>
> Lin,Cui
>


Re: Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-25 Thread Michael Armbrust
The SQL parser without HiveContext is really simple, which is why I
generally recommend users use HiveContext.  However, you can do it with
dataframes:

import org.apache.spark.sql.functions._
table("purchases").select(explode(df("purchase_items")).as("item"))



On Fri, Sep 25, 2015 at 4:21 PM, Jerry Lam  wrote:

> Hi sparkers,
>
> Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
> I don't want to start up a metastore and derby just because I need LATERAL
> VIEW EXPLODE.
>
> I have been trying but I always get the exception like this:
>
> Name: java.lang.RuntimeException
> Message: [1.68] failure: ``union'' expected but identifier view found
>
> with the query look like:
>
> "select items from purhcases lateral view explode(purchase_items) tbl as
> items"
>
> Best Regards,
>
> Jerry
>
>


Error in starting sparkR: Error in socketConnection(port = monitorPort) :

2015-09-25 Thread Jonathan Yue
I have been trying to start p sparkR but always get the error about monitorPort:

export 
LD_LIBRARY_PATH=$HOME/jaguar/lib:$HOME/opt/hadoop/lib/nativeJDBCJAR=$HOME/jaguar/lib/jaguar-jdbc-2.0.jarsparkR
 \ --driver-class-path $JDBCJAR \ --driver-library-path $HOME/jaguar/lib \ 
--conf spark.executor.extraClassPath=$JDBCJAR \ --conf 
spark.executor.extraLibraryPath=$HOME/jaguar/lib

Launching java with spark-submit command 
/home/exeray/opt/spark/bin/spark-submit   "--conf" 
"spark.driver.extraLibraryPath=/home/exeray/jaguar/lib" "--conf" 
"spark.driver.extraClassPath=/path/to/your/jaguar-jdbc-2.0.jar" "--conf" 
"spark.executor.extraLibraryPath=/home/exeray/jaguar/lib" "--conf" 
"spark.executor.extraClassPath=/home/exeray/jaguar/lib/jaguar-jdbc-2.0.jar" 
"sparkr-shell" /tmp/RtmpGbIfCx/backend_port3f704328c68dError in 
socketConnection(port = monitorPort) :  cannot open the connectionIn addition: 
Warning message:In socketConnection(port = monitorPort) : localhost:42600 
cannot be openedDuring startup - Warning message:package ‘SparkR’ was built 
under R version 3.2.1

Is there some config I am missing? Not much help from google. Every time port 
number <42600> changes to a different one,
 Best regards,  Jonathan

Re: Spark for Oracle sample code

2015-09-25 Thread Jonathan Yue
In your dbtable you can insert "select ..." instead of table name. I never 
tried, but saw example from the web. Best regards,  Jonathan 
  From: Cui Lin 
 To: user  
 Sent: Friday, September 25, 2015 4:12 PM
 Subject: Spark for Oracle sample code
   


Hello, All,
I found the examples for JDBC connection are mostly read the whole table and 
then do operations like joining.
val jdbcDF = sqlContext.read.format("jdbc").options( 
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()
Sometimes it is not practical since the whole table data is too big and not 
necessary.
What makes sense to me is to use sparksql to get subset data from oracle tables 
using sql-like statement.I couldn't find such examples. Can someone show me?


-- 
Best regards!

Lin,Cui





  

Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-25 Thread Jerry Lam
Hi sparkers,

Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
I don't want to start up a metastore and derby just because I need LATERAL
VIEW EXPLODE.

I have been trying but I always get the exception like this:

Name: java.lang.RuntimeException
Message: [1.68] failure: ``union'' expected but identifier view found

with the query look like:

"select items from purhcases lateral view explode(purchase_items) tbl as
items"

Best Regards,

Jerry


Spark for Oracle sample code

2015-09-25 Thread Cui Lin
Hello, All,

I found the examples for JDBC connection are mostly read the whole table
and then do operations like joining.

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()


Sometimes it is not practical since the whole table data is too big and not
necessary.

What makes sense to me is to use sparksql to get subset data from oracle
tables using sql-like statement.
I couldn't find such examples. Can someone show me?



-- 
Best regards!

Lin,Cui


Fwd: Spark for Oracle sample code

2015-09-25 Thread Cui Lin
Hello, All,

I found the examples for JDBC connection are mostly read the whole table
and then do operations like joining.

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()


Sometimes it is not practical since the whole table data is too big and not
necessary.

What makes sense to me is to use sparksql to get subset data from oracle
tables using sql-like statement.
I couldn't find such examples. Can someone show me?



-- 
Best regards!

Lin,Cui



-- 
Best regards!

Lin,Cui


GraphX create graph with multiple node attributes

2015-09-25 Thread JJ
Hi, 

I am new to Spark and GraphX, so thanks in advance for your patience. I want
to create a graph with multiple node attributes. Here is my code:



But I receive error:


Can someone help? Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-create-graph-with-multiple-node-attributes-tp24827.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SPARK-SQL] Requested array size exceeds VM limit

2015-09-25 Thread Sadhan Sood
I am trying to run a query on a month of data. The volume of data is not
much, but we have a partition per hour and per day. The table schema is
heavily nested with total of 300 leaf fields. I am trying to run a simple
select count(*) query on the table and running into this exception:

 SELECT
 >   COUNT(*)
 >  FROM
 >p_all_tx
 >  WHERE
 >date_prefix >= "20150500"
 >AND date_prefix <= "20150700"
 >AND sanitizeddetails.merchantaccountid = 'Rvr7StMZSTQj';

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
> at java.util.Arrays.copyOf(Arrays.java:3236)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
> at
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2003)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:703)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:702)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:702)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:73)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate$$anonfun$doExecute$1.apply(SortBasedAggregate.scala:70)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregate.doExecute(SortBasedAggregate.scala:70)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:142)
> at
> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
> at org.apache.spark.sql.execution.Exchange.doExecute(Exchange.scala:141)


The table is a parquet table. I am not sure why the closure should exceed
VM limit. Could somebody explain why this is happening. Is it because I
have a lot of partitions and table scan is essentially creating one RDD per
partition.


Re: Is this a Spark issue or Hive issue that Spark cannot read the string type data in the Parquet generated by Hive

2015-09-25 Thread Cheng Lian
BTW, just checked that this bug should have been fixed since Hive 
0.14.0. So the SQL option I mentioned is mostly used for reading legacy 
Parquet files generated by older versions of Hive.


Cheng

On 9/25/15 2:42 PM, Cheng Lian wrote:
Please set the the SQL option spark.sql.parquet.binaryAsString to true 
when reading Parquet files containing strings generated by Hive.


This is actually a bug of parquet-hive. When generating Parquet schema 
for a string field, Parquet requires a "UTF8" annotation, something like:


message hive_schema {
  ...
  optional binary column2 (UTF8);
  ...
}

but parquet-hive fails to add it, and produces:

message hive_schema {
  ...
  optional binary column2;
  ...
}

Thus binary fields and string fields are made indistinguishable.

Interestingly, there's another bug in parquet-thrift, which always 
adds UTF8 annotation to all binary fields :)


Cheng

On 9/25/15 2:03 PM, java8964 wrote:

Hi, Spark Users:

I have a problem related to Spark cannot recognize the string type in 
the Parquet schema generated by Hive.


Version of all components:

Spark 1.3.1
Hive 0.12.0
Parquet 1.3.2

I generated a detail low level table in the Parquet format using 
MapReduce java code. This table can be read in the Hive and Spark 
without any issue.


Now I create a Hive aggregation table like following:

create external table T (
column1 bigint,
*column2 string,*
..
)
partitioned by (dt string)
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
location '/hdfs_location'

Then the table is populated in the Hive by:

set hive.exec.compress.output=true;
set parquet.compression=snappy;

insert into table T partition(dt='2015-09-23')
select
.
from Detail_Table
group by

After this, we can query the T table in the Hive without issue.

But if I try to use it in the Spark 1.3.1 like following:

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val v_event_cnt=sqlContext.parquetFile("/hdfs_location/dt=2015-09-23")

scala> v_event_cnt.printSchema
root
 |-- column1: long (nullable = true)
* |-- column2: binary (nullable = true)*
 |-- 
 |-- dt: string (nullable = true)

The Spark will recognize column2 as binary type, instead of string 
type in this case, but in the Hive, it works fine.
So this bring an issue that in the Spark, the data will be dumped as 
"[B@e353d68". To use it in the Spark, I have to cast it as string, to 
get the correct value out of it.


I wonder this mismatch type of Parquet file could be caused by which 
part? Is the Hive not generate the correct Parquet file with schema, 
or Spark in fact cannot recognize it due to problem in it.


Is there a way I can do either Hive or Spark to make this parquet 
schema correctly on both ends?


Thanks

Yong






Re: Is this a Spark issue or Hive issue that Spark cannot read the string type data in the Parquet generated by Hive

2015-09-25 Thread Cheng Lian
Please set the the SQL option spark.sql.parquet.binaryAsString to true 
when reading Parquet files containing strings generated by Hive.


This is actually a bug of parquet-hive. When generating Parquet schema 
for a string field, Parquet requires a "UTF8" annotation, something like:


message hive_schema {
  ...
  optional binary column2 (UTF8);
  ...
}

but parquet-hive fails to add it, and produces:

message hive_schema {
  ...
  optional binary column2;
  ...
}

Thus binary fields and string fields are made indistinguishable.

Interestingly, there's another bug in parquet-thrift, which always adds 
UTF8 annotation to all binary fields :)


Cheng

On 9/25/15 2:03 PM, java8964 wrote:

Hi, Spark Users:

I have a problem related to Spark cannot recognize the string type in 
the Parquet schema generated by Hive.


Version of all components:

Spark 1.3.1
Hive 0.12.0
Parquet 1.3.2

I generated a detail low level table in the Parquet format using 
MapReduce java code. This table can be read in the Hive and Spark 
without any issue.


Now I create a Hive aggregation table like following:

create external table T (
column1 bigint,
*column2 string,*
..
)
partitioned by (dt string)
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT "parquet.hive.DeprecatedParquetInputFormat"
OUTPUTFORMAT "parquet.hive.DeprecatedParquetOutputFormat"
location '/hdfs_location'

Then the table is populated in the Hive by:

set hive.exec.compress.output=true;
set parquet.compression=snappy;

insert into table T partition(dt='2015-09-23')
select
.
from Detail_Table
group by

After this, we can query the T table in the Hive without issue.

But if I try to use it in the Spark 1.3.1 like following:

import org.apache.spark.sql.SQLContext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val v_event_cnt=sqlContext.parquetFile("/hdfs_location/dt=2015-09-23")

scala> v_event_cnt.printSchema
root
 |-- column1: long (nullable = true)
* |-- column2: binary (nullable = true)*
 |-- 
 |-- dt: string (nullable = true)

The Spark will recognize column2 as binary type, instead of string 
type in this case, but in the Hive, it works fine.
So this bring an issue that in the Spark, the data will be dumped as 
"[B@e353d68". To use it in the Spark, I have to cast it as string, to 
get the correct value out of it.


I wonder this mismatch type of Parquet file could be caused by which 
part? Is the Hive not generate the correct Parquet file with schema, 
or Spark in fact cannot recognize it due to problem in it.


Is there a way I can do either Hive or Spark to make this parquet 
schema correctly on both ends?


Thanks

Yong




Re: Distance metrics in KMeans

2015-09-25 Thread sethah
It looks like the distance metric is hard coded to the L2 norm (euclidean
distance) in MLlib. As you may expect, you are not the first person to
desire other metrics and there has been some prior effort. 

Please reference this PR: https://github.com/apache/spark/pull/2634

And corresponding JIRA: https://issues.apache.org/jira/browse/SPARK-3219

Seems as if the addition of arbitrary distance metrics is non-trivial given
current implementation in MLlib. Not sure of any current work towards this
issue.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Distance-metrics-in-KMeans-tp24823p24826.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to control timeout in node failure for spark task ?

2015-09-25 Thread roy
Hi,

  We are running Spark 1.3 on CDH 5.4.1 on top of YARN. we want to know how
do we control task timeout when node fails and task running on it should be
restarted on another node. at present job wait for approximately 10 min to
restart the task were running on failed node.

http://spark.apache.org/docs/latest/configuration.html Here i see many
timeout config, just dont know which one to override.

any help here ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-control-timeout-in-node-failure-for-spark-task-tp24825.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is this a Spark issue or Hive issue that Spark cannot read the string type data in the Parquet generated by Hive

2015-09-25 Thread java8964
Hi, Spark Users:
I have a problem related to Spark cannot recognize the string type in the 
Parquet schema generated by Hive.
Version of all components:
Spark 1.3.1Hive 0.12.0Parquet 1.3.2
I generated a detail low level table in the Parquet format using MapReduce java 
code. This table can be read in the Hive and Spark without any issue.
Now I create a Hive aggregation table like following:
create external table T (column1 bigint,column2 string,
..)partitioned by (dt string)ROW FORMAT SERDE 
'parquet.hive.serde.ParquetHiveSerDe'STORED ASINPUTFORMAT 
"parquet.hive.DeprecatedParquetInputFormat"OUTPUTFORMAT 
"parquet.hive.DeprecatedParquetOutputFormat"location '/hdfs_location'
Then the table is populated in the Hive by:
set hive.exec.compress.output=true;set parquet.compression=snappy;
insert into table T partition(dt='2015-09-23')select .from 
Detail_Tablegroup by 
After this, we can query the T table in the Hive without issue.
But if I try to use it in the Spark 1.3.1 like following:
import org.apache.spark.sql.SQLContextval sqlContext = new 
org.apache.spark.sql.hive.HiveContext(sc)val 
v_event_cnt=sqlContext.parquetFile("/hdfs_location/dt=2015-09-23")
scala> v_event_cnt.printSchemaroot |-- column1: long (nullable = true) |-- 
column2: binary (nullable = true) |--  |-- dt: string (nullable = 
true)
The Spark will recognize column2 as binary type, instead of string type in this 
case, but in the Hive, it works fine.So this bring an issue that in the Spark, 
the data will be dumped as "[B@e353d68". To use it in the Spark, I have to cast 
it as string, to get the correct value out of it.
I wonder this mismatch type of Parquet file could be caused by which part? Is 
the Hive not generate the correct Parquet file with schema, or Spark in fact 
cannot recognize it due to problem in it. 
Is there a way I can do either Hive or Spark to make this parquet schema 
correctly on both ends?
Thanks
Yong  

Distance metrics in KMeans

2015-09-25 Thread bobtreacy
Is it possible to use other distance metrics than Euclidean (e.g. Tanimoto,
Manhattan) with MLlib KMeans?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Distance-metrics-in-KMeans-tp24823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: hive on spark query error

2015-09-25 Thread Marcelo Vanzin
Seems like you have "hive.server2.enable.doAs" enabled; you can either
disable it, or configure hs2 so that the user running the service
("hadoop" in your case) can impersonate others.

See:
https://hadoop.apache.org/docs/r2.7.0/hadoop-project-dist/hadoop-common/Superusers.html

On Fri, Sep 25, 2015 at 10:33 AM, Garry Chen  wrote:
> 2015-09-25 13:31:16,245 INFO  [stderr-redir-1]: client.SparkClientImpl 
> (SparkClientImpl.java:run(569)) - ERROR: 
> org.apache.hadoop.security.authorize.AuthorizationException: User: hadoop is 
> not allowed to impersonate HIVEAPP

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka & Spark Streaming

2015-09-25 Thread Neelesh
Thanks. Ill keep an eye on this. Our implementation of the DStream
basically accepts a function to compute current offsets. The implementation
of the function fetches list of topics from zookeeper once in while. It
then adds consumer offsets for newly added topics  with the currentOffsets
thats in memory  & deletes removed topics. The "once in a while" is
pluggable as well, and we are planning to use ZK watches instead of a time
based refresh. Works for us because we use ZK extensively for a lot of
other book keeping.


On Fri, Sep 25, 2015 at 1:16 PM, Cody Koeninger  wrote:

> Yes, the partition IDs are the same.
>
> As far as the failure / subclassing goes, you may want to keep an eye on
> https://issues.apache.org/jira/browse/SPARK-10320 , not sure if the
> suggestions in there will end up going anywhere.
>
> On Fri, Sep 25, 2015 at 3:01 PM, Neelesh  wrote:
>
>> For the 1-1 mapping case, can I use TaskContext.get().partitionId as an
>> index in to the offset ranges?
>> For the failure case, yes, I'm subclassing of DirectKafkaInputDStream.
>> As for failures, different partitions in the same batch may be talking to
>> different RDBMS servers due to multitenancy - a spark streaming app is
>> consuming from several topics, each topic mapped to a customer for example.
>> It is quite possible that in a batch, several partitions belonging to the
>> same customer may fail, and others will go through. We don't want the whole
>> job to be killed because of one failing customer,and affect others in the
>> same job. Hope that makes sense.
>>
>> thnx
>>
>> On Fri, Sep 25, 2015 at 12:52 PM, Cody Koeninger 
>> wrote:
>>
>>> Your success case will work fine, it is a 1-1 mapping as you said.
>>>
>>> To handle failures in exactly the way you describe, you'd need to
>>> subclass or modify DirectKafkaInputDStream and change the way compute()
>>> works.
>>>
>>> Unless you really are going to have very fine-grained failures (why
>>> would only a given partition be failing while the rest are fine?) it's
>>> going to be easier to just fail the whole task and retry, or eventually
>>> kill the job.
>>>
>>> On Fri, Sep 25, 2015 at 1:55 PM, Neelesh  wrote:
>>>
 Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
 trying to achieve

 stream.foreachRDD {rdd=>
rdd.foreachPartition { p=>

Try(myFunc(...))  match {
  case Sucess(s) => updatewatermark for this partition //of
 course, expectation is that it will work only if there is a 1-1 mapping at
 this point in time
  case Failure()  => Tell the driver not to generate a partition
 for this kafka topic+partition for a while, by updating some shared state
 (zk)

}

  }
 }

 I was looking for that mapping b/w kafka partition thats bound to a
 task inside the task execution code, in cases where the intermediate
 operations do not change partitions, shuffle etc.

 -neelesh

 On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger 
 wrote:

>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> also has an example of how to close over the offset ranges so they are
> available on executors.
>
> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh  wrote:
>
>> Hi,
>>We are using DirectKafkaInputDStream and store completed consumer
>> offsets in Kafka (0.8.2). However, some of our use case require that
>> offsets be not written if processing of a partition fails with certain
>> exceptions. This allows us to build various backoff strategies for that
>> partition, instead of either blindly committing consumer offsets 
>> regardless
>> of errors (because KafkaRDD as HasOffsetRanges is available only on the
>> driver)  or relying on Spark's retry logic and continuing without 
>> remedial
>> action.
>>
>> I was playing with SparkListener and found that while one can listen
>> on taskCompletedEvent on the driver and even figure out that there was an
>> error, there is no way of mapping this task back to the partition and
>> retrieving offset range, topic & kafka partition # etc.
>>
>> Any pointers appreciated!
>>
>> Thanks!
>> -neelesh
>>
>
>

>>>
>>
>


Re: Kafka & Spark Streaming

2015-09-25 Thread Cody Koeninger
Yes, the partition IDs are the same.

As far as the failure / subclassing goes, you may want to keep an eye on
https://issues.apache.org/jira/browse/SPARK-10320 , not sure if the
suggestions in there will end up going anywhere.

On Fri, Sep 25, 2015 at 3:01 PM, Neelesh  wrote:

> For the 1-1 mapping case, can I use TaskContext.get().partitionId as an
> index in to the offset ranges?
> For the failure case, yes, I'm subclassing of DirectKafkaInputDStream. As
> for failures, different partitions in the same batch may be talking to
> different RDBMS servers due to multitenancy - a spark streaming app is
> consuming from several topics, each topic mapped to a customer for example.
> It is quite possible that in a batch, several partitions belonging to the
> same customer may fail, and others will go through. We don't want the whole
> job to be killed because of one failing customer,and affect others in the
> same job. Hope that makes sense.
>
> thnx
>
> On Fri, Sep 25, 2015 at 12:52 PM, Cody Koeninger 
> wrote:
>
>> Your success case will work fine, it is a 1-1 mapping as you said.
>>
>> To handle failures in exactly the way you describe, you'd need to
>> subclass or modify DirectKafkaInputDStream and change the way compute()
>> works.
>>
>> Unless you really are going to have very fine-grained failures (why would
>> only a given partition be failing while the rest are fine?) it's going to
>> be easier to just fail the whole task and retry, or eventually kill the job.
>>
>> On Fri, Sep 25, 2015 at 1:55 PM, Neelesh  wrote:
>>
>>> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
>>> trying to achieve
>>>
>>> stream.foreachRDD {rdd=>
>>>rdd.foreachPartition { p=>
>>>
>>>Try(myFunc(...))  match {
>>>  case Sucess(s) => updatewatermark for this partition //of
>>> course, expectation is that it will work only if there is a 1-1 mapping at
>>> this point in time
>>>  case Failure()  => Tell the driver not to generate a partition
>>> for this kafka topic+partition for a while, by updating some shared state
>>> (zk)
>>>
>>>}
>>>
>>>  }
>>> }
>>>
>>> I was looking for that mapping b/w kafka partition thats bound to a task
>>> inside the task execution code, in cases where the intermediate operations
>>> do not change partitions, shuffle etc.
>>>
>>> -neelesh
>>>
>>> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger 
>>> wrote:
>>>

 http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

 also has an example of how to close over the offset ranges so they are
 available on executors.

 On Fri, Sep 25, 2015 at 12:50 PM, Neelesh  wrote:

> Hi,
>We are using DirectKafkaInputDStream and store completed consumer
> offsets in Kafka (0.8.2). However, some of our use case require that
> offsets be not written if processing of a partition fails with certain
> exceptions. This allows us to build various backoff strategies for that
> partition, instead of either blindly committing consumer offsets 
> regardless
> of errors (because KafkaRDD as HasOffsetRanges is available only on the
> driver)  or relying on Spark's retry logic and continuing without remedial
> action.
>
> I was playing with SparkListener and found that while one can listen
> on taskCompletedEvent on the driver and even figure out that there was an
> error, there is no way of mapping this task back to the partition and
> retrieving offset range, topic & kafka partition # etc.
>
> Any pointers appreciated!
>
> Thanks!
> -neelesh
>


>>>
>>
>


Re: kafka direct streaming with checkpointing

2015-09-25 Thread Neelesh
As Cody says, to achieve true exactly once, the book keeping has to happen
in the sink data system, that too assuming its a transactional store.
Wherever possible, we try to make the application idempotent (upsert in
HBase, ignore-on-duplicate for MySQL etc), but there are still cases
(analytics, counting etc) where it becomes a hard problem. In such cases we
try to move the problem to the end system storage where its transactional.
If the end system is non-transactional, we hope that our external book
keeping mechanism (ZK or Kafka itself) is good enough and assume failures
around book keeping itself are rare.

On Fri, Sep 25, 2015 at 12:15 PM, Radu Brumariu  wrote:

> Wouldn't the same case be made for checkpointing in general ?
> What I am trying to say, is that this particular situation is part of the
> general checkpointing use case, not an edge case.
> I would like to understand why shouldn't the checkpointing mechanism,
> already existent in Spark, handle this situation too ?
>
> On Fri, Sep 25, 2015 at 12:20 PM, Cody Koeninger 
> wrote:
>
>> Storing passbacks transactionally with results in your own data store,
>> with a schema that makes sense for you, is the optimal solution.
>>
>> On Fri, Sep 25, 2015 at 11:05 AM, Radu Brumariu  wrote:
>>
>>> Right, I understand why the exceptions happen.
>>> However, it seems less useful to have a checkpointing that only works in
>>> the case of an application restart. IMO, code changes happen quite often,
>>> and not being able to pick up where the previous job left off is quite a
>>> bit of a hinderance.
>>>
>>> The solutions you mention would partially solve the problem, while
>>> bringing new problems along ( increased resource utilization, difficulty in
>>> managing multiple jobs consuming the same data ,etc ).
>>>
>>> The solution that we currently employ is committing the offsets to a
>>> durable storage and making sure that the job reads the offsets from there
>>> upon restart, while forsaking checkpointing.
>>>
>>> The scenario seems not to be an edge case, which is why I was asking
>>> that perhaps it could be handled by the spark kafka API instead having
>>> everyone come up with their own, sub-optimal solutions.
>>>
>>> Radu
>>>
>>> On Fri, Sep 25, 2015 at 5:06 AM, Adrian Tanase 
>>> wrote:
>>>
 Hi Radu,

 The problem itself is not checkpointing the data – if your operations
 are stateless then you are only checkpointing the kafka offsets, you are
 right.
 The problem is that you are also checkpointing metadata – including the
 actual Code and serialized java classes – that’s why you’ll see
 ser/deser exceptions on restart with upgrade.

 If you’re not using stateful opetations, you might get away by using
 the old Kafka receiver w/o WAL – but you accept “at least once semantics”.
 As soon as you add in the WAL you are forced to checkpoint and you’re
 better off with the DirectReceiver approach.

 I believe the simplest way to get around is to support runnning 2
 versions in parallel – with some app level control of a barrier (e.g. v1
 reads events up to 3:00am, v2 after that). Manual state management is also
 supported by the framework but it’s harder to control because:

- you’re not guaranteed to shut down gracefully
- You may have a bug that prevents the state to be saved and you
can’t restart the app w/o upgrade

 Less than ideal, yes :)

 -adrian

 From: Radu Brumariu
 Date: Friday, September 25, 2015 at 1:31 AM
 To: Cody Koeninger
 Cc: "user@spark.apache.org"
 Subject: Re: kafka direct streaming with checkpointing

 Would changing the direct stream api to support committing the offsets
 to kafka's ZK( like a regular consumer) as a fallback mechanism, in case
 recovering from checkpoint fails , be an accepted solution?

 On Thursday, September 24, 2015, Cody Koeninger 
 wrote:

> This has been discussed numerous times, TD's response has consistently
> been that it's unlikely to be possible
>
> On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu 
> wrote:
>
>> It seems to me that this scenario that I'm facing, is quite common
>> for spark jobs using Kafka.
>> Is there a ticket to add this sort of semantics to checkpointing ?
>> Does it even make sense to add it there ?
>>
>> Thanks,
>> Radu
>>
>>
>> On Thursday, September 24, 2015, Cody Koeninger 
>> wrote:
>>
>>> No, you cant use checkpointing across code changes.  Either store
>>> offsets yourself, or start up your new app code and let it catch up 
>>> before
>>> killing the old one.
>>>
>>> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu 
>>> wrote:
>>>
 Hi,
 in my application I use Kafka direct streaming and I have also
 enabled checkpointing.
 This seems to work fine if the application is res

Re: Kafka & Spark Streaming

2015-09-25 Thread Neelesh
For the 1-1 mapping case, can I use TaskContext.get().partitionId as an
index in to the offset ranges?
For the failure case, yes, I'm subclassing of DirectKafkaInputDStream. As
for failures, different partitions in the same batch may be talking to
different RDBMS servers due to multitenancy - a spark streaming app is
consuming from several topics, each topic mapped to a customer for example.
It is quite possible that in a batch, several partitions belonging to the
same customer may fail, and others will go through. We don't want the whole
job to be killed because of one failing customer,and affect others in the
same job. Hope that makes sense.

thnx

On Fri, Sep 25, 2015 at 12:52 PM, Cody Koeninger  wrote:

> Your success case will work fine, it is a 1-1 mapping as you said.
>
> To handle failures in exactly the way you describe, you'd need to subclass
> or modify DirectKafkaInputDStream and change the way compute() works.
>
> Unless you really are going to have very fine-grained failures (why would
> only a given partition be failing while the rest are fine?) it's going to
> be easier to just fail the whole task and retry, or eventually kill the job.
>
> On Fri, Sep 25, 2015 at 1:55 PM, Neelesh  wrote:
>
>> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
>> trying to achieve
>>
>> stream.foreachRDD {rdd=>
>>rdd.foreachPartition { p=>
>>
>>Try(myFunc(...))  match {
>>  case Sucess(s) => updatewatermark for this partition //of
>> course, expectation is that it will work only if there is a 1-1 mapping at
>> this point in time
>>  case Failure()  => Tell the driver not to generate a partition
>> for this kafka topic+partition for a while, by updating some shared state
>> (zk)
>>
>>}
>>
>>  }
>> }
>>
>> I was looking for that mapping b/w kafka partition thats bound to a task
>> inside the task execution code, in cases where the intermediate operations
>> do not change partitions, shuffle etc.
>>
>> -neelesh
>>
>> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger 
>> wrote:
>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>
>>> also has an example of how to close over the offset ranges so they are
>>> available on executors.
>>>
>>> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh  wrote:
>>>
 Hi,
We are using DirectKafkaInputDStream and store completed consumer
 offsets in Kafka (0.8.2). However, some of our use case require that
 offsets be not written if processing of a partition fails with certain
 exceptions. This allows us to build various backoff strategies for that
 partition, instead of either blindly committing consumer offsets regardless
 of errors (because KafkaRDD as HasOffsetRanges is available only on the
 driver)  or relying on Spark's retry logic and continuing without remedial
 action.

 I was playing with SparkListener and found that while one can listen on
 taskCompletedEvent on the driver and even figure out that there was an
 error, there is no way of mapping this task back to the partition and
 retrieving offset range, topic & kafka partition # etc.

 Any pointers appreciated!

 Thanks!
 -neelesh

>>>
>>>
>>
>


Re: kafka direct streaming with checkpointing

2015-09-25 Thread Cody Koeninger
Spark's checkpointing system is not a transactional database, and it
doesn't really make sense to try and turn it into one.

On Fri, Sep 25, 2015 at 2:15 PM, Radu Brumariu  wrote:

> Wouldn't the same case be made for checkpointing in general ?
> What I am trying to say, is that this particular situation is part of the
> general checkpointing use case, not an edge case.
> I would like to understand why shouldn't the checkpointing mechanism,
> already existent in Spark, handle this situation too ?
>
> On Fri, Sep 25, 2015 at 12:20 PM, Cody Koeninger 
> wrote:
>
>> Storing passbacks transactionally with results in your own data store,
>> with a schema that makes sense for you, is the optimal solution.
>>
>> On Fri, Sep 25, 2015 at 11:05 AM, Radu Brumariu  wrote:
>>
>>> Right, I understand why the exceptions happen.
>>> However, it seems less useful to have a checkpointing that only works in
>>> the case of an application restart. IMO, code changes happen quite often,
>>> and not being able to pick up where the previous job left off is quite a
>>> bit of a hinderance.
>>>
>>> The solutions you mention would partially solve the problem, while
>>> bringing new problems along ( increased resource utilization, difficulty in
>>> managing multiple jobs consuming the same data ,etc ).
>>>
>>> The solution that we currently employ is committing the offsets to a
>>> durable storage and making sure that the job reads the offsets from there
>>> upon restart, while forsaking checkpointing.
>>>
>>> The scenario seems not to be an edge case, which is why I was asking
>>> that perhaps it could be handled by the spark kafka API instead having
>>> everyone come up with their own, sub-optimal solutions.
>>>
>>> Radu
>>>
>>> On Fri, Sep 25, 2015 at 5:06 AM, Adrian Tanase 
>>> wrote:
>>>
 Hi Radu,

 The problem itself is not checkpointing the data – if your operations
 are stateless then you are only checkpointing the kafka offsets, you are
 right.
 The problem is that you are also checkpointing metadata – including the
 actual Code and serialized java classes – that’s why you’ll see
 ser/deser exceptions on restart with upgrade.

 If you’re not using stateful opetations, you might get away by using
 the old Kafka receiver w/o WAL – but you accept “at least once semantics”.
 As soon as you add in the WAL you are forced to checkpoint and you’re
 better off with the DirectReceiver approach.

 I believe the simplest way to get around is to support runnning 2
 versions in parallel – with some app level control of a barrier (e.g. v1
 reads events up to 3:00am, v2 after that). Manual state management is also
 supported by the framework but it’s harder to control because:

- you’re not guaranteed to shut down gracefully
- You may have a bug that prevents the state to be saved and you
can’t restart the app w/o upgrade

 Less than ideal, yes :)

 -adrian

 From: Radu Brumariu
 Date: Friday, September 25, 2015 at 1:31 AM
 To: Cody Koeninger
 Cc: "user@spark.apache.org"
 Subject: Re: kafka direct streaming with checkpointing

 Would changing the direct stream api to support committing the offsets
 to kafka's ZK( like a regular consumer) as a fallback mechanism, in case
 recovering from checkpoint fails , be an accepted solution?

 On Thursday, September 24, 2015, Cody Koeninger 
 wrote:

> This has been discussed numerous times, TD's response has consistently
> been that it's unlikely to be possible
>
> On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu 
> wrote:
>
>> It seems to me that this scenario that I'm facing, is quite common
>> for spark jobs using Kafka.
>> Is there a ticket to add this sort of semantics to checkpointing ?
>> Does it even make sense to add it there ?
>>
>> Thanks,
>> Radu
>>
>>
>> On Thursday, September 24, 2015, Cody Koeninger 
>> wrote:
>>
>>> No, you cant use checkpointing across code changes.  Either store
>>> offsets yourself, or start up your new app code and let it catch up 
>>> before
>>> killing the old one.
>>>
>>> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu 
>>> wrote:
>>>
 Hi,
 in my application I use Kafka direct streaming and I have also
 enabled checkpointing.
 This seems to work fine if the application is restarted. However if
 I change the code and resubmit the application, it cannot start 
 because of
 the checkpointed data being of different class versions.
 Is there any way I can use checkpointing that can survive across
 application version changes?

 Thanks,
 Radu


>>>
>
>>>
>>
>


Re: Kafka & Spark Streaming

2015-09-25 Thread Cody Koeninger
Your success case will work fine, it is a 1-1 mapping as you said.

To handle failures in exactly the way you describe, you'd need to subclass
or modify DirectKafkaInputDStream and change the way compute() works.

Unless you really are going to have very fine-grained failures (why would
only a given partition be failing while the rest are fine?) it's going to
be easier to just fail the whole task and retry, or eventually kill the job.

On Fri, Sep 25, 2015 at 1:55 PM, Neelesh  wrote:

> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
> trying to achieve
>
> stream.foreachRDD {rdd=>
>rdd.foreachPartition { p=>
>
>Try(myFunc(...))  match {
>  case Sucess(s) => updatewatermark for this partition //of course,
> expectation is that it will work only if there is a 1-1 mapping at this
> point in time
>  case Failure()  => Tell the driver not to generate a partition
> for this kafka topic+partition for a while, by updating some shared state
> (zk)
>
>}
>
>  }
> }
>
> I was looking for that mapping b/w kafka partition thats bound to a task
> inside the task execution code, in cases where the intermediate operations
> do not change partitions, shuffle etc.
>
> -neelesh
>
> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger 
> wrote:
>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>
>> also has an example of how to close over the offset ranges so they are
>> available on executors.
>>
>> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh  wrote:
>>
>>> Hi,
>>>We are using DirectKafkaInputDStream and store completed consumer
>>> offsets in Kafka (0.8.2). However, some of our use case require that
>>> offsets be not written if processing of a partition fails with certain
>>> exceptions. This allows us to build various backoff strategies for that
>>> partition, instead of either blindly committing consumer offsets regardless
>>> of errors (because KafkaRDD as HasOffsetRanges is available only on the
>>> driver)  or relying on Spark's retry logic and continuing without remedial
>>> action.
>>>
>>> I was playing with SparkListener and found that while one can listen on
>>> taskCompletedEvent on the driver and even figure out that there was an
>>> error, there is no way of mapping this task back to the partition and
>>> retrieving offset range, topic & kafka partition # etc.
>>>
>>> Any pointers appreciated!
>>>
>>> Thanks!
>>> -neelesh
>>>
>>
>>
>


Re: spark.streaming.concurrentJobs

2015-09-25 Thread Atul Kulkarni
Can someone please help either by explaining or pointing to documentation
the relationship between #executors needed and How to let the concurrent
jobs that are created by the above parameter run in parallel?

On Thu, Sep 24, 2015 at 11:56 PM, Atul Kulkarni 
wrote:

> Hi Folks,
>
> I am trying to speed up my spark streaming job, I found a presentation by
> Tathagata Das that mentions to increase value of
> "spark.streaming.concurrentJobs" if I have more than one output.
>
> In my spark streaming job I am reading from Kafka using Receiver Bases
> approach and transforming each line of data from Kafka and storing to
> HBase. I do not intend to do any kind of collation at this stage. I believe
> this can be parallelized by creating a separate job to write a different
> set of lines from Kafka to HBase and hence, I set the above parameter to a
> value > 1. Is my above assumption that writing to HBase for each partition
> in the RDDs from a given DStream is an independent output operation and can
> be parallelized?
>
> If the assumption is correct, and I run the job - this job creates
> multiple (smaller) jobs but they are executed one after another, not in
> parallel - I am curious if there is a requirement that #Executors be >= a
> particular number (a calculation based on how many repartitions after unio
> od DSreams etc. - I don't know I am grasping at Straws here.)
>
> I would appreciate some help in this regard. Thanks in advance.
>
> --
> Regards,
> Atul Kulkarni
>



-- 
Regards,
Atul Kulkarni


how to handle OOMError from groupByKey

2015-09-25 Thread Elango Cheran
Hi everyone,
I have an RDD of the format (user: String, timestamp: Long, state:
Boolean).  My task invovles converting the states, where on/off is
represented as true/false, into intervals of 'on' of the format (beginTs:
Long, endTs: Long).  So this task requires me, per user, to line up all of
the on/off states so that I can compute when it is on, since the
calculation is neither associative nor commutative.

So there are 2 main operations that I'm trying to accomplish together:
1. group by each user
2. sort by time -- keep all of the states in sorted order by time

The main code inside the method that does grouping by user and sorting by
time looks sort of looks like this:


// RDD starts off in format (user, ts, state) of type RDD[(String, Long,
Boolean)]
val grouped = keyedStatesRDD.groupByKey
// after .groupByKey, format for RDD is (user, seq-of-(ts, state)) of type
RDD[(String, Iterable(Long, Boolean))]
// take the sequence of (ts, state) per user, sort, get intervals
val groupedIntervals = grouped.mapValues(
  states => {
val sortedStates = states.toSeq.sortBy(_._1)
val intervals = DFUtil.statesToIntervals(sortedStates)
val intervalsList = bucketDurations.map{case(k,v) =>
(k,v)}(collection.breakOut).sortBy(_._1)
intervalsList
  }
)
// after .mapValues, new format for RDD is (user, seq-of-(startTime,
endTime)) of type RDD[(String, IndexedSeq(Long, Long))]


When I run my Spark job with 1 day's worth of data, the job completes
successfully.  When I run with 1 month's or 1 year's worth of data, that
method is where my Spark job consistently crashes with get
OutOfMemoryErrors.  I need to run on the full year's worth of data.

My suspicion is that the groupByKey is the problem (it's pulling all of the
matching data values into a single executor's heap as a plain Scala
Iterable).  But alternatives of doing sortByKey on the RDD first before
grouping, or sortByKey followed by a fold[ByKey] or aggregate[ByKey] don't
quite apply in my scenario because my operation is not associative (can't
combine per-partition results) and I still need to group by users before
doing a foldLeft.

I've definitely thought about the issue before and come across users with
issues that are similar but not exactly the same:
http://apache-spark-user-list.1001560.n3.nabble.com/Folding-an-RDD-in-order-td16577.html
http://mail-archives.us.apache.org/mod_mbox/spark-user/201501.mbox/%3ccaae1cqr8rd8ypebcmbjwfhm+lxh6nw4+r+uharx00psk_sh...@mail.gmail.com%3E
http://apache-spark-user-list.1001560.n3.nabble.com/Partition-sorting-by-Spark-framework-td18213.html
http://apache-spark-user-list.1001560.n3.nabble.com/Alternatives-to-groupByKey-td20293.html

And this Jira seems relevant too:
https://issues.apache.org/jira/browse/SPARK-3655

The amount of memory that I'm using is 2g per executor, and I can't go
higher than that because each executor gets a YARN container from nodes
with 16 GB of RAM and 5 YARN containers allowed per node.

So I'd like to know if there's an easy solution to executing my logic on my
full dataset in Spark.

Thanks!

-- Elango


Re: Convert Vector to RDD[Double]

2015-09-25 Thread Sourigna Phetsarath
import  org.apache.spark.mllib.linalg._

val v = Vectors.dense(1.0,2.0)
val rdd = sc.parallelize(v.toArray)



On Fri, Sep 25, 2015 at 2:46 PM, Yusuf Can Gürkan 
wrote:

> How can i convert a Vector to RDD[Double]. For example:
>
> val vector = Vectors.dense(1.0,2.0)
> val rdd // i need sc.parallelize(Array(1.0,2.0))
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna



Re: executor-cores setting does not work under Yarn

2015-09-25 Thread Gavin Yue
I think I found the problem.

Have to change the yarn capacity scheduler to use

DominantResourceCalculator

Thanks!


On Fri, Sep 25, 2015 at 4:54 AM, Akhil Das 
wrote:

> Which version of spark are you having? Can you also check whats set in
> your conf/spark-defaults.conf file?
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:58 AM, Gavin Yue  wrote:
>
>> Running Spark app over Yarn 2.7
>>
>> Here is my sparksubmit setting:
>> --master yarn-cluster \
>>  --num-executors 100 \
>>  --executor-cores 3 \
>>  --executor-memory 20g \
>>  --driver-memory 20g \
>>  --driver-cores 2 \
>>
>> But the executor cores setting is not working. It always assigns only one
>> vcore  to one container based on the cluster metrics from yarn resource
>> manager website.
>>
>> And yarn setting for container is
>> min:   max: 
>>
>> I have tried to change num-executors and executor memory. It even ignores
>> the min cCores setting and always assign one core per container.
>>
>> Any advice?
>>
>> Thank you!
>>
>>
>>
>>
>>
>


Re: kafka direct streaming with checkpointing

2015-09-25 Thread Radu Brumariu
Wouldn't the same case be made for checkpointing in general ?
What I am trying to say, is that this particular situation is part of the
general checkpointing use case, not an edge case.
I would like to understand why shouldn't the checkpointing mechanism,
already existent in Spark, handle this situation too ?

On Fri, Sep 25, 2015 at 12:20 PM, Cody Koeninger  wrote:

> Storing passbacks transactionally with results in your own data store,
> with a schema that makes sense for you, is the optimal solution.
>
> On Fri, Sep 25, 2015 at 11:05 AM, Radu Brumariu  wrote:
>
>> Right, I understand why the exceptions happen.
>> However, it seems less useful to have a checkpointing that only works in
>> the case of an application restart. IMO, code changes happen quite often,
>> and not being able to pick up where the previous job left off is quite a
>> bit of a hinderance.
>>
>> The solutions you mention would partially solve the problem, while
>> bringing new problems along ( increased resource utilization, difficulty in
>> managing multiple jobs consuming the same data ,etc ).
>>
>> The solution that we currently employ is committing the offsets to a
>> durable storage and making sure that the job reads the offsets from there
>> upon restart, while forsaking checkpointing.
>>
>> The scenario seems not to be an edge case, which is why I was asking that
>> perhaps it could be handled by the spark kafka API instead having everyone
>> come up with their own, sub-optimal solutions.
>>
>> Radu
>>
>> On Fri, Sep 25, 2015 at 5:06 AM, Adrian Tanase  wrote:
>>
>>> Hi Radu,
>>>
>>> The problem itself is not checkpointing the data – if your operations
>>> are stateless then you are only checkpointing the kafka offsets, you are
>>> right.
>>> The problem is that you are also checkpointing metadata – including the
>>> actual Code and serialized java classes – that’s why you’ll see
>>> ser/deser exceptions on restart with upgrade.
>>>
>>> If you’re not using stateful opetations, you might get away by using the
>>> old Kafka receiver w/o WAL – but you accept “at least once semantics”. As
>>> soon as you add in the WAL you are forced to checkpoint and you’re better
>>> off with the DirectReceiver approach.
>>>
>>> I believe the simplest way to get around is to support runnning 2
>>> versions in parallel – with some app level control of a barrier (e.g. v1
>>> reads events up to 3:00am, v2 after that). Manual state management is also
>>> supported by the framework but it’s harder to control because:
>>>
>>>- you’re not guaranteed to shut down gracefully
>>>- You may have a bug that prevents the state to be saved and you
>>>can’t restart the app w/o upgrade
>>>
>>> Less than ideal, yes :)
>>>
>>> -adrian
>>>
>>> From: Radu Brumariu
>>> Date: Friday, September 25, 2015 at 1:31 AM
>>> To: Cody Koeninger
>>> Cc: "user@spark.apache.org"
>>> Subject: Re: kafka direct streaming with checkpointing
>>>
>>> Would changing the direct stream api to support committing the offsets
>>> to kafka's ZK( like a regular consumer) as a fallback mechanism, in case
>>> recovering from checkpoint fails , be an accepted solution?
>>>
>>> On Thursday, September 24, 2015, Cody Koeninger 
>>> wrote:
>>>
 This has been discussed numerous times, TD's response has consistently
 been that it's unlikely to be possible

 On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu 
 wrote:

> It seems to me that this scenario that I'm facing, is quite common for
> spark jobs using Kafka.
> Is there a ticket to add this sort of semantics to checkpointing ?
> Does it even make sense to add it there ?
>
> Thanks,
> Radu
>
>
> On Thursday, September 24, 2015, Cody Koeninger 
> wrote:
>
>> No, you cant use checkpointing across code changes.  Either store
>> offsets yourself, or start up your new app code and let it catch up 
>> before
>> killing the old one.
>>
>> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu 
>> wrote:
>>
>>> Hi,
>>> in my application I use Kafka direct streaming and I have also
>>> enabled checkpointing.
>>> This seems to work fine if the application is restarted. However if
>>> I change the code and resubmit the application, it cannot start because 
>>> of
>>> the checkpointed data being of different class versions.
>>> Is there any way I can use checkpointing that can survive across
>>> application version changes?
>>>
>>> Thanks,
>>> Radu
>>>
>>>
>>

>>
>


Re: Reading Hive Tables using SQLContext

2015-09-25 Thread Michael Armbrust
Eventually I'd like to eliminate HiveContext, but for now I just recommend
that most users use it instead of SQLContext.

On Thu, Sep 24, 2015 at 5:41 PM, Sathish Kumaran Vairavelu <
vsathishkuma...@gmail.com> wrote:

> Thanks Michael. Just want to check if there is a roadmap to include Hive
> tables from SQLContext.
>
> -Sathish
>
> On Thu, Sep 24, 2015 at 7:46 PM Michael Armbrust 
> wrote:
>
>> No, you have to use a HiveContext.
>>
>> On Thu, Sep 24, 2015 at 2:47 PM, Sathish Kumaran Vairavelu <
>> vsathishkuma...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is it possible to access Hive tables directly from SQLContext instead of
>>> HiveContext? I am facing with errors while doing it.
>>>
>>> Please let me know
>>>
>>>
>>> Thanks
>>>
>>> Sathish
>>>
>>
>>


Re: Kafka & Spark Streaming

2015-09-25 Thread Neelesh
Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
trying to achieve

stream.foreachRDD {rdd=>
   rdd.foreachPartition { p=>

   Try(myFunc(...))  match {
 case Sucess(s) => updatewatermark for this partition //of course,
expectation is that it will work only if there is a 1-1 mapping at this
point in time
 case Failure()  => Tell the driver not to generate a partition for
this kafka topic+partition for a while, by updating some shared state (zk)

   }

 }
}

I was looking for that mapping b/w kafka partition thats bound to a task
inside the task execution code, in cases where the intermediate operations
do not change partitions, shuffle etc.

-neelesh

On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger  wrote:

>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> also has an example of how to close over the offset ranges so they are
> available on executors.
>
> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh  wrote:
>
>> Hi,
>>We are using DirectKafkaInputDStream and store completed consumer
>> offsets in Kafka (0.8.2). However, some of our use case require that
>> offsets be not written if processing of a partition fails with certain
>> exceptions. This allows us to build various backoff strategies for that
>> partition, instead of either blindly committing consumer offsets regardless
>> of errors (because KafkaRDD as HasOffsetRanges is available only on the
>> driver)  or relying on Spark's retry logic and continuing without remedial
>> action.
>>
>> I was playing with SparkListener and found that while one can listen on
>> taskCompletedEvent on the driver and even figure out that there was an
>> error, there is no way of mapping this task back to the partition and
>> retrieving offset range, topic & kafka partition # etc.
>>
>> Any pointers appreciated!
>>
>> Thanks!
>> -neelesh
>>
>
>


Convert Vector to RDD[Double]

2015-09-25 Thread Yusuf Can Gürkan
How can i convert a Vector to RDD[Double]. For example:

val vector = Vectors.dense(1.0,2.0)
val rdd // i need sc.parallelize(Array(1.0,2.0))


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Broadcast to executors with multiple cores

2015-09-25 Thread Jeff Palmucci
So I have a large data structure that I want to broadcast to my executors. It 
is so large that it makes sense to share access to the object between multiple 
tasks, so I create my executors with multiple cores. Unfortunately, it looks 
like the object is not shared between threads, but is copied once for each 
thread.

Is my assumption right (1 copy per thread) and is there any way I can force a 
broadcast to be shared between threads?

Using 1.4.1

Thanks!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka & Spark Streaming

2015-09-25 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

also has an example of how to close over the offset ranges so they are
available on executors.

On Fri, Sep 25, 2015 at 12:50 PM, Neelesh  wrote:

> Hi,
>We are using DirectKafkaInputDStream and store completed consumer
> offsets in Kafka (0.8.2). However, some of our use case require that
> offsets be not written if processing of a partition fails with certain
> exceptions. This allows us to build various backoff strategies for that
> partition, instead of either blindly committing consumer offsets regardless
> of errors (because KafkaRDD as HasOffsetRanges is available only on the
> driver)  or relying on Spark's retry logic and continuing without remedial
> action.
>
> I was playing with SparkListener and found that while one can listen on
> taskCompletedEvent on the driver and even figure out that there was an
> error, there is no way of mapping this task back to the partition and
> retrieving offset range, topic & kafka partition # etc.
>
> Any pointers appreciated!
>
> Thanks!
> -neelesh
>


Re: Kafka & Spark Streaming

2015-09-25 Thread Petr Novak
You can have offsetRanges on workers f.e.

object Something {
  var offsetRanges = Array[OffsetRange]()

  def create[F : ClassTag](stream: InputDStream[Array[Byte]])
  (implicit codec: Codec[F]: DStream[F] = {
stream transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  rdd flatMap { message =>
Try(codec.decode(message)) match {
  case Success(fact) => Some(fact)
  case Failure(e) => None
}
  }
  }
}

call create and use returned stream downstream.

or something like

// See https://issues.apache.org/jira/browse/SPARK-5569 why I map
OffsetRamges to a custom class


case class TopicMetadata(name: String, partition: Int, fromOffset:
Long, untilOffset: Long)

object KafkaContext {
  private[this] var state = Array[TopicMetadata]()

  def captureTopicMetadata(offsetRanges: Array[OffsetRange]): Unit = {
state = offsetRanges.map { o =>
  TopicMetadata(o.topic, o.partition, o.fromOffset, o.untilOffset)
}
  }

  def topics: Array[TopicMetadata] = state
}

//then somewhere

def run(steam) = {
  stream.transform { rdd =>

KafkaContext.captureTopicMetadata(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)

rdd
  
  
  
  }

  .forecahRDD {
val s = KafkaContext.topics.map { x =>
  s"${x.name}_${x.partition}_${x.fromOffset}-${x.untilOffset}"
}
...
  }


}


So they can be available on Driver. Sorry for not precise code. I'm in a
hurry. There a probably mistakes but you can get the idea.
Petr

On Fri, Sep 25, 2015 at 7:50 PM, Neelesh  wrote:

> Hi,
>We are using DirectKafkaInputDStream and store completed consumer
> offsets in Kafka (0.8.2). However, some of our use case require that
> offsets be not written if processing of a partition fails with certain
> exceptions. This allows us to build various backoff strategies for that
> partition, instead of either blindly committing consumer offsets regardless
> of errors (because KafkaRDD as HasOffsetRanges is available only on the
> driver)  or relying on Spark's retry logic and continuing without remedial
> action.
>
> I was playing with SparkListener and found that while one can listen on
> taskCompletedEvent on the driver and even figure out that there was an
> error, there is no way of mapping this task back to the partition and
> retrieving offset range, topic & kafka partition # etc.
>
> Any pointers appreciated!
>
> Thanks!
> -neelesh
>


Kafka & Spark Streaming

2015-09-25 Thread Neelesh
Hi,
   We are using DirectKafkaInputDStream and store completed consumer
offsets in Kafka (0.8.2). However, some of our use case require that
offsets be not written if processing of a partition fails with certain
exceptions. This allows us to build various backoff strategies for that
partition, instead of either blindly committing consumer offsets regardless
of errors (because KafkaRDD as HasOffsetRanges is available only on the
driver)  or relying on Spark's retry logic and continuing without remedial
action.

I was playing with SparkListener and found that while one can listen on
taskCompletedEvent on the driver and even figure out that there was an
error, there is no way of mapping this task back to the partition and
retrieving offset range, topic & kafka partition # etc.

Any pointers appreciated!

Thanks!
-neelesh


Re: Weird worker usage

2015-09-25 Thread N B
Hi Akhil,

I do have 25 partitions being created. I have set
the spark.default.parallelism property to 25. Batch size is 30 seconds and
block interval is 1200 ms which also gives us roughly 25 partitions from
the input stream. I can see 25 partitions being created and used in the
Spark UI also. Its just that those tasks are waiting for cores on N1 to get
free before being scheduled while N2 is sitting idle.

The cluster configuration is:

N1: 2 workers, 6 cores and 16gb memory => 12 cores on the first node.
N2: 2 workers, 8 cores and 16 gb memory => 16 cores on the second node.

for a grand total of 28 cores. But it still does most of the processing on
N1 (divided among the 2 workers running) but almost completely disregarding
N2 until its the final stage where data is being written out to
Elasticsearch. I am not sure I understand the reason behind it not
distributing more partitions to N2 to begin with and use it effectively.
Since there are only 12 cores on N1 and 25 total partitions, shouldn't it
send some of those partitions to N2 as well?

Thanks
Nikunj


On Fri, Sep 25, 2015 at 5:28 AM, Akhil Das 
wrote:

> Parallel tasks totally depends on the # of partitions that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>
>> Hello all,
>>
>> I have a Spark streaming application that reads from a Flume Stream, does
>> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
>> operations before writing the analyzed output to ElasticSearch inside a
>> foreachRDD()...
>>
>> I recently started to run this on a 2 node cluster (Standalone) with the
>> driver program directly submitting to Spark master on the same host. The
>> way I have divided the resources is as follows:
>>
>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>> worker)
>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>
>> The application works just fine but it is underusing N2 completely. It
>> seems to use N1 (note that both executors on N1 get used) for all the
>> analytics but when it comes to writing to Elasticsearch, it does divide the
>> data around into all 4 executors which then write to ES on a separate host.
>>
>> I am puzzled as to why the data is not being distributed evenly from the
>> get go into all 4 executors and why would it only do so in the final step
>> of the pipeline which seems counterproductive as well?
>>
>> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>>
>> Any help in getting the resources more evenly utilized on N1 and N2 is
>> welcome.
>>
>> Thanks in advance,
>> Nikunj
>>
>>
>


Re: Using Map and Basic Operators yield java.lang.ClassCastException (Parquet + Hive + Spark SQL 1.5.0 + Thrift)

2015-09-25 Thread Cheng Lian
Thanks for the clarification. Could you please provide the full schema 
of your table and query plans of your query? You may obtain them via:


hiveContext.table("your_table").printSchema()

and

hiveContext.sql("your query").explain(extended = true)

You also mentioned "Thrift" in the subject, did you mean the Thrift 
server? Or maybe the Parquet files were written by parquet-thrift? Could 
you please also provide the full Parquet schema of the Parquet files you 
were reading? You may get the schema using the parquet-schema CLI tool:


$ parquet-schema 

Here you can find instructions of how to build parquet-tools, just in 
case you don't have it at hand: 
https://github.com/Parquet/parquet-mr/issues/321


If you don't want to bother building parquet-tools (which can be 
sometimes troublesome), you may also try this in spark-shell:


hiveContext.table("your_table").head(1)

Then you should be able to find the Parquet schema from Spark driver log 
(please make sure you enable INFO log).



Cheng

On 9/24/15 7:59 PM, Dominic Ricard wrote:

No, those were just examples on how maps can look like. In my case, the 
key-value is either there or not in the form of the later:

{"key1":{"key2":"value"}}

If key1 is present, then it will contain a tuple of key2:value, value being a 
'int'

I guess, after some testing, that my problem is on how casting a Map value to 
the primitives Float and Double are handled. Handling INT is all good but float 
and double are causing the exception.

Thanks.

Dominic Ricard
Triton Digital

-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Thursday, September 24, 2015 5:47 PM
To: Dominic Ricard; user@spark.apache.org
Subject: Re: Using Map and Basic Operators yield java.lang.ClassCastException 
(Parquet + Hive + Spark SQL 1.5.0 + Thrift)



On 9/24/15 11:34 AM, Dominic Ricard wrote:

Hi,
 I stumbled on the following today. We have Parquet files that
expose a column in a Map format. This is very convenient as we have
data parts that can vary in time. Not knowing what the data will be,
we simply split it in tuples and insert it as a map inside 1 column.

Retrieving the data is very easy. Syntax looks like this:

select column.key1.key2 from table;

Column value look like this:
{}
{"key1":"value"}
{"key1":{"key2":"value"}}

Do you mean that the value type of the map may also vary? The 2nd record has a 
string value, while the 3rd one has another nested map as its value. This isn't 
supported in Spark SQL.

But when trying to do basic operators on that column, I get the
following
error:

query: select (column.key1.key2 / 30 < 1) from table

ERROR processing query/statement. Error Code: 0, SQL state:
TStatus(statusCode:ERROR_STATUS,
infoMessages:[*org.apache.hive.service.cli.HiveSQLException:java.lang.ClassCastException:
org.apache.spark.sql.types.NullType$ cannot be cast to
org.apache.spark.sql.types.MapType:26:25,
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
runInternal:SparkExecuteStatementOperation.scala:259,
org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:
run:SparkExecuteStatementOperation.scala:144,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementIn
ternal:HiveSessionImpl.java:388,
org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:H
iveSessionImpl.java:369,
sun.reflect.GeneratedMethodAccessor115:invoke::-1,
sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccess
orImpl.java:43, java.lang.reflect.Method:invoke:Method.java:497,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessio
nProxy.java:78,
org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSe
ssionProxy.java:36,
org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSession
Proxy.java:63,
java.security.AccessController:doPrivileged:AccessController.java:-2,
javax.security.auth.Subject:doAs:Subject.java:422,
org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformat
ion.java:1628,
org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessio
nProxy.java:59, com.sun.proxy.$Proxy39:executeStatement::-1,
org.apache.hive.service.cli.CLIService:executeStatement:CLIService.jav
a:261,
org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:T
hriftCLIService.java:486,
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatem
ent:getResult:TCLIService.java:1313,
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatem
ent:getResult:TCLIService.java:1298,
org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39,
org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39,
org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddr
essProcessor.java:56,
org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPo
olServer.java:285,
java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.j
ava:1142,
java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadP

RE: hive on spark query error

2015-09-25 Thread Garry Chen
Yes you are right.  Make the change and also link hive-site.xml into spark conf 
directory.  Rerun the sql getting error in hive.log

2015-09-25 13:31:14,750 INFO  [HiveServer2-Handler-Pool: Thread-125]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting 
impersonation of HIVEAPP
2015-09-25 13:31:14,750 INFO  [HiveServer2-Handler-Pool: Thread-125]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client 
driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit 
--executor-memory 512m --proxy-user HIVEAPP --properties-file 
/tmp/spark-submit.4348738410387344124.properties --class 
org.apache.hive.spark.client.RemoteDriver 
/u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host 
ip-10-92-82-229.ec2.internal --remote-port 48481 --conf 
hive.spark.client.connect.timeout=1000 --conf 
hive.spark.client.server.connect.timeout=9 --conf 
hive.spark.client.channel.log.level=null --conf 
hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 
--conf hive.spark.client.secret.bits=256
2015-09-25 13:31:15,473 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.server.connect.timeout=9
2015-09-25 13:31:15,473 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.threads=8
2015-09-25 13:31:15,474 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.connect.timeout=1000
2015-09-25 13:31:15,474 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.secret.bits=256
2015-09-25 13:31:15,474 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.max.size=52428800
2015-09-25 13:31:15,718 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 13:31:15 WARN util.NativeCodeLoader: 
Unable to load native-hadoop library for your platform... using builtin-java 
classes where applicable
2015-09-25 13:31:16,063 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 13:31:16 INFO client.RMProxy: 
Connecting to ResourceManager at /0.0.0.0:8032
2015-09-25 13:31:16,245 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - ERROR: 
org.apache.hadoop.security.authorize.AuthorizationException: User: hadoop is 
not allowed to impersonate HIVEAPP
2015-09-25 13:31:16,248 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 13:31:16 INFO util.Utils: Shutdown 
hook called
2015-09-25 13:31:16,265 WARN  [Driver]: client.SparkClientImpl 
(SparkClientImpl.java:run(427)) - Child process exited with code 1.

-Original Message-
From: Marcelo Vanzin [mailto:van...@cloudera.com] 
Sent: Friday, September 25, 2015 1:12 PM
To: Garry Chen 
Cc: Jimmy Xiang ; user@spark.apache.org
Subject: Re: hive on spark query error

On Fri, Sep 25, 2015 at 10:05 AM, Garry Chen  wrote:
> In spark-defaults.conf the spark.master  is  spark://hostname:7077.  
> From hive-site.xml  
> spark.master
> hostname
>   

That's not a valid value for spark.master (as the error indicates).
You should set it to "spark://hostname:7077", as you have it in 
spark-defaults.conf (or perhaps remove the setting from hive-site.xml, I think 
hive will honor your spark-defaults.conf).

--
Marcelo


Re: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

2015-09-25 Thread Daniel Haviv
I tried but I'm getting the same error (task not serializable)

> On 25 בספט׳ 2015, at 20:10, Ted Yu  wrote:
> 
> Is the Schema.parse() call expensive ?
> 
> Can you call it in the closure ?
> 
>> On Fri, Sep 25, 2015 at 10:06 AM, Daniel Haviv 
>>  wrote:
>> Hi,
>> I'm getting a NotSerializableException even though I'm creating all the my 
>> objects from within the closure:
>>  import org.apache.avro.generic.GenericDatumReader
>>  import   java.io.File
>> import org.apache.avro._
>> 
>>  val orig_schema = Schema.parse(new File("/home/wasabi/schema"))
>>   val READER = new GenericDatumReader[GenericRecord](schema)
>> 
>> val bd = sc.broadcast(orig_schema.toString)
>> 
>> 
>>  val rdd=sc.binaryFiles("/daniel").map(zibi => {  
>>  val schema_obj =  new Schema.Parser
>>  val schema2 = schema_obj.parse(bd.value)
>> })
>> 
>> I think that the problem is that Schema itself is an abstract class with 
>> static methods (such as Parser).
>> 
>> Am I correct? 
>> How can I overcome it ?
>> 
>> Thank you.
>> Daniel
> 


Re: hive on spark query error

2015-09-25 Thread Marcelo Vanzin
On Fri, Sep 25, 2015 at 10:05 AM, Garry Chen  wrote:
> In spark-defaults.conf the spark.master  is  spark://hostname:7077.  From
> hive-site.xml  
> spark.master
> hostname
>   

That's not a valid value for spark.master (as the error indicates).
You should set it to "spark://hostname:7077", as you have it in
spark-defaults.conf (or perhaps remove the setting from hive-site.xml,
I think hive will honor your spark-defaults.conf).

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

2015-09-25 Thread Ted Yu
Is the Schema.parse() call expensive ?

Can you call it in the closure ?

On Fri, Sep 25, 2015 at 10:06 AM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm getting a NotSerializableException even though I'm creating all the my
> objects from within the closure:
>  import org.apache.avro.generic.GenericDatumReader
>  import   java.io.File
> import org.apache.avro._
>
>  val orig_schema = Schema.parse(new File("/home/wasabi/schema"))
>   val READER = new GenericDatumReader[GenericRecord](schema)
>
> val bd = sc.broadcast(orig_schema.toString)
>
>
>  val rdd=sc.binaryFiles("/daniel").map(zibi => {
>  val schema_obj =  new Schema.Parser
>  val schema2 = schema_obj.parse(bd.value)
> })
>
> I think that the problem is that Schema itself is an abstract class with
> static methods (such as Parser).
>
> Am I correct?
> How can I overcome it ?
>
> Thank you.
> Daniel
>


java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

2015-09-25 Thread Daniel Haviv
Hi,
I'm getting a NotSerializableException even though I'm creating all the my
objects from within the closure:
 import org.apache.avro.generic.GenericDatumReader
 import   java.io.File
import org.apache.avro._

 val orig_schema = Schema.parse(new File("/home/wasabi/schema"))
  val READER = new GenericDatumReader[GenericRecord](schema)

val bd = sc.broadcast(orig_schema.toString)


 val rdd=sc.binaryFiles("/daniel").map(zibi => {
 val schema_obj =  new Schema.Parser
 val schema2 = schema_obj.parse(bd.value)
})

I think that the problem is that Schema itself is an abstract class with
static methods (such as Parser).

Am I correct?
How can I overcome it ?

Thank you.
Daniel


RE: hive on spark query error

2015-09-25 Thread Garry Chen
In spark-defaults.conf the spark.master  is  spark://hostname:7077.  From 
hive-site.xml  
spark.master
hostname
  



From: Jimmy Xiang [mailto:jxi...@cloudera.com]
Sent: Friday, September 25, 2015 1:00 PM
To: Garry Chen 
Cc: user@spark.apache.org
Subject: Re: hive on spark query error

> Error: Master must start with yarn, spark, mesos, or local
What's your setting for spark.master?

On Fri, Sep 25, 2015 at 9:56 AM, Garry Chen 
mailto:g...@cornell.edu>> wrote:
Hi All,
I am following 
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started?
 to setup hive on spark.  After setup/configuration everything startup I am 
able to show tables but when executing sql statement within beeline I got 
error.  Please help and thank you very much.

Cluster Environment (3 nodes) as following
hadoop-2.7.1
spark-1.4.1-bin-hadoop2.6
zookeeper-3.4.6
apache-hive-1.2.1-bin

Error from hive log:
2015-09-25 11:51:03,123 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting 
impersonation of oracle
2015-09-25 11:51:03,133 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client 
driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit 
--proxy-user oracle --properties-file 
/tmp/spark-submit.840692098393819749.properties --class 
org.apache.hive.spark.client.RemoteDriver 
/u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host 
ip-10-92-82-229.ec2.internal --remote-port 40476 --conf 
hive.spark.client.connect.timeout=1000 --conf 
hive.spark.client.server.connect.timeout=9 --conf 
hive.spark.client.channel.log.level=null --conf 
hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 
--conf hive.spark.client.secret.bits=256
2015-09-25 11:51:03,867 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.server.connect.timeout=9
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.threads=8
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.connect.timeout=1000
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.secret.bits=256
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.max.size=52428800
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Error: Master must start with yarn, spark, 
mesos, or local
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Run with --help for usage help or --verbose 
for debug output
2015-09-25 11:51:03,885 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 11:51:03 INFO util.Utils: Shutdown 
hook called
2015-09-25 11:51:03,889 WARN  [Driver]: client.SparkClientImpl 
(SparkClientImpl.java:run(427)) - Child process exited with code 1.





Re: --class has to be always specified in spark-submit either it is defined in jar manifest?

2015-09-25 Thread Petr Novak
I'm sorry. Both approaches actually work. It was something else wrong with
my cluster. Petr

On Fri, Sep 25, 2015 at 4:53 PM, Petr Novak  wrote:

> Either setting it programatically doesn't work:
> sparkConf.setIfMissing("class", "...Main")
>
> In my current setting moving main to another package requires to propagate
> change to deploy scripts. Doesn't matter I will find some other way. Petr
>
> On Fri, Sep 25, 2015 at 4:40 PM, Petr Novak  wrote:
>
>> Ortherwise it seems it tries to load from a checkpoint which I have
>> deleted and cannot be found. Or it should work and I have wrong something
>> else. Documentation doesn't mention option with jar manifest, so I assume
>> it doesn't work this way.
>>
>> Many thanks,
>> Petr
>>
>
>


Re: hive on spark query error

2015-09-25 Thread Jimmy Xiang
> Error: Master must start with yarn, spark, mesos, or local

What's your setting for spark.master?

On Fri, Sep 25, 2015 at 9:56 AM, Garry Chen  wrote:

> Hi All,
>
> I am following
> https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started?
> to setup hive on spark.  After setup/configuration everything startup I am
> able to show tables but when executing sql statement within beeline I got
> error.  Please help and thank you very much.
>
>
>
> Cluster Environment (3 nodes) as following
>
> hadoop-2.7.1
>
> spark-1.4.1-bin-hadoop2.6
>
> zookeeper-3.4.6
>
> apache-hive-1.2.1-bin
>
>
>
> Error from hive log:
>
> 2015-09-25 11:51:03,123 INFO  [HiveServer2-Handler-Pool: Thread-50]:
> client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting
> impersonation of oracle
>
> 2015-09-25 11:51:03,133 INFO  [HiveServer2-Handler-Pool: Thread-50]:
> client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running
> client driver with argv:
> /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit --proxy-user oracle
> --properties-file /tmp/spark-submit.840692098393819749.properties --class
> org.apache.hive.spark.client.RemoteDriver
> /u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host
> ip-10-92-82-229.ec2.internal --remote-port 40476 --conf
> hive.spark.client.connect.timeout=1000 --conf
> hive.spark.client.server.connect.timeout=9 --conf
> hive.spark.client.channel.log.level=null --conf
> hive.spark.client.rpc.max.size=52428800 --conf
> hive.spark.client.rpc.threads=8 --conf hive.spark.client.secret.bits=256
>
> 2015-09-25 11:51:03,867 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config
> property: hive.spark.client.server.connect.timeout=9
>
> 2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config
> property: hive.spark.client.rpc.threads=8
>
> 2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config
> property: hive.spark.client.connect.timeout=1000
>
> 2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config
> property: hive.spark.client.secret.bits=256
>
> 2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config
> property: hive.spark.client.rpc.max.size=52428800
>
> 2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - Error: Master must start with yarn,
> spark, mesos, or local
>
> 2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - Run with --help for usage help or
> --verbose for debug output
>
> 2015-09-25 11:51:03,885 INFO  [stderr-redir-1]: client.SparkClientImpl
> (SparkClientImpl.java:run(569)) - 15/09/25 11:51:03 INFO util.Utils:
> Shutdown hook called
>
> 2015-09-25 11:51:03,889 WARN  [Driver]: client.SparkClientImpl
> (SparkClientImpl.java:run(427)) - Child process exited with code 1.
>
>
>
>
>


hive on spark query error

2015-09-25 Thread Garry Chen
Hi All,
I am following 
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started?
 to setup hive on spark.  After setup/configuration everything startup I am 
able to show tables but when executing sql statement within beeline I got 
error.  Please help and thank you very much.

Cluster Environment (3 nodes) as following
hadoop-2.7.1
spark-1.4.1-bin-hadoop2.6
zookeeper-3.4.6
apache-hive-1.2.1-bin

Error from hive log:
2015-09-25 11:51:03,123 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(375)) - Attempting 
impersonation of oracle
2015-09-25 11:51:03,133 INFO  [HiveServer2-Handler-Pool: Thread-50]: 
client.SparkClientImpl (SparkClientImpl.java:startDriver(409)) - Running client 
driver with argv: /u01/app/spark-1.4.1-bin-hadoop2.6/bin/spark-submit 
--proxy-user oracle --properties-file 
/tmp/spark-submit.840692098393819749.properties --class 
org.apache.hive.spark.client.RemoteDriver 
/u01/app/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar --remote-host 
ip-10-92-82-229.ec2.internal --remote-port 40476 --conf 
hive.spark.client.connect.timeout=1000 --conf 
hive.spark.client.server.connect.timeout=9 --conf 
hive.spark.client.channel.log.level=null --conf 
hive.spark.client.rpc.max.size=52428800 --conf hive.spark.client.rpc.threads=8 
--conf hive.spark.client.secret.bits=256
2015-09-25 11:51:03,867 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.server.connect.timeout=9
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.threads=8
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.connect.timeout=1000
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.secret.bits=256
2015-09-25 11:51:03,868 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Warning: Ignoring non-spark config property: 
hive.spark.client.rpc.max.size=52428800
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Error: Master must start with yarn, spark, 
mesos, or local
2015-09-25 11:51:03,876 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - Run with --help for usage help or --verbose 
for debug output
2015-09-25 11:51:03,885 INFO  [stderr-redir-1]: client.SparkClientImpl 
(SparkClientImpl.java:run(569)) - 15/09/25 11:51:03 INFO util.Utils: Shutdown 
hook called
2015-09-25 11:51:03,889 WARN  [Driver]: client.SparkClientImpl 
(SparkClientImpl.java:run(427)) - Child process exited with code 1.




Re: Weird worker usage

2015-09-25 Thread Bryan Jeffrey
Looking at this further, it appears that my Spark Context is not correctly
setting the Master name.  I see the following in logs:

15/09/25 16:45:42 INFO DriverRunner: Launch Command:
"/usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java" "-cp"
"/spark/spark-1.4.1/sbin/../conf/:/spark/spark-1.4.1/assembly/target/scala-2.10/spark-assembly-1.4.1-hadoop2.2.0.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-api-jdo-3.2.6.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-rdbms-3.2.9.jar:/spark/spark-1.4.1/lib_managed/jars/datanucleus-core-3.2.10.jar"
"-Xms512M" "-Xmx512M" "-Dakka.loglevel=WARNING"
"-Dspark.default.parallelism=6" "-Dspark.rpc.askTimeout=10" "-
Dspark.app.name=MainClass" "-Dspark.master=spark://sparkserver:7077"
"-Dspark.driver.supervise=true" "-Dspark.logConf=true"
"-Dspark.jars=file:/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
"-Dspark.streaming.receiver.maxRate=500" "-XX:MaxPermSize=256m"
"org.apache.spark.deploy.worker.DriverWrapper" "akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker"
"/spark/spark-1.4.1/work/driver-20150925164617-/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
"MainClass" "--checkpoint" "/tmp/sparkcheckpoint" "--broker"
"kafkaBroker:9092" "--topic" "test" "--numStreams" "9"
"--threadParallelism" "9"
15/09/25 16:45:43 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/09/25 16:45:43 INFO SecurityManager: Changing view acls to: root
15/09/25 16:45:43 INFO SecurityManager: Changing modify acls to: root
15/09/25 16:45:43 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root); users
with modify permissions: Set(root)
15/09/25 16:45:44 INFO Slf4jLogger: Slf4jLogger started
15/09/25 16:45:45 INFO Utils: Successfully started service 'Driver' on port
59670.
15/09/25 16:45:45 INFO WorkerWatcher: Connecting to worker akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker
15/09/25 16:45:45 INFO MainClass: MainClass - Setup Logger
15/09/25 16:45:45 INFO WorkerWatcher: Successfully connected to akka.tcp://
sparkWorker@10.0.0.6:48077/user/Worker
15/09/25 16:45:45 INFO Checkpoint: Checkpoint directory
/tmp/sparkcheckpoint does not exist
15/09/25 16:45:45 INFO MainClass: Setting up streaming context with
configuration: org.apache.spark.SparkConf@56057cbf and time window 2000 ms
15/09/25 16:45:45 INFO SparkContext: Running Spark version 1.4.1
15/09/25 16:45:45 INFO SparkContext: Spark configuration:
spark.app.name=MainClass
spark.default.parallelism=6
spark.driver.supervise=true
spark.jars=file:/tmp/OinkSpark-1.0-SNAPSHOT-jar-with-dependencies.jar
spark.logConf=true
spark.master=local[*]
spark.rpc.askTimeout=10
spark.streaming.receiver.maxRate=500

As you can see, despite -Dmaster=spark://sparkserver:7077, the streaming
context still registers the master as local[*].  Any idea why?

Thank you,

Bryan Jeffrey


Handle null/NaN values in mllib classifier

2015-09-25 Thread matd
Hi folks,

I have a set of categorical columns (strings), that I'm parsing and
converting into Vectors of features to pass to a mllib classifier (random
forest). 

In my input data, some columns have null values. Say, in one of those
columns, I have p values + a null value :
How should I build my feature Vectors, and the categoricalFeaturesInfo map
of the classifier ?
* option 1 : I tell p values in categoricalFeaturesInfo, and I use
Double.NaN in my input Vectors ?  [ How NaNs are handled by classifiers ? ]
* option 2 : I consider nulls as a value, so I tell (p+1) values in
categoricalFeaturesInfo, and I map nulls to some int ?


Thanks for your help.

Mathieu

(PS : I know the the new dataframe + pipeline + vectorindexer API, but for
reasons it doesn't fit well my need, so I need to do that by myself)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Handle-null-NaN-values-in-mllib-classifier-tp24822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Generic DataType in UDAF

2015-09-25 Thread Ritesh Agrawal
hi Yin,

I have a written a simple UDAF to generate N samples for each group. I am
using reservoir sampling algorithm for this. In this case since the input
data type doesn't matter as I am not doing any kind of processing on the
input data but just selecting them by random and building an array and
returning that array. Later I use explode to convert them back into rows.
Below is my UDAF and test suite

= UDAF 
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.util.Random



/**
 * Created by ragrawal on 9/23/15.
 */
class ReservoirSampling(k: Int, seed: Long = Random.nextLong() ) extends
UserDefinedAggregateFunction{

  // Schema you get as an input
  def inputSchema = StructType(StructField("id", StringType) :: Nil )



  // Schema of the row which is used for aggregation
  def bufferSchema = StructType(
StructField("ids", ArrayType(StringType,
containsNull = true)) ::
StructField("count", LongType) :: Nil
)


  // Returned type
  def dataType: DataType = ArrayType(StringType, containsNull = false)

  // Self-explaining
  def deterministic = true

  // zero value
  def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = Seq[String]()
buffer(1) = 0
  }


  // Similar to seqOp in aggregate
  def update(buffer: MutableAggregationBuffer, input: Row) = {
update(buffer, input.getAs[String](0))
  }

  def update(buffer: MutableAggregationBuffer, item: String) = {
if(item != null){
  val ids: Seq[String] = buffer.getSeq(0)

  if(ids.length < k){ // fill reservoir
buffer(0) = ids :+ item
  }else{
//TODO: validate this its buffer.getInt(1) or buffer.getInt(1) + 1
val idx = new Random(seed).nextInt(buffer.getInt(1))
if(idx < k){ // maintain reservoir
  ids.updated(idx, item)
}
  }
  buffer(1) = buffer.getInt(1) + 1
}else{
  throw new RuntimeException("Cannot handle null strings")
}

  }
  // Similar to combOp in aggregate
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer2.getSeq[String](0).foreach(update(buffer1, _))
  }


  // Called on exit to get return value
  def evaluate(buffer: Row) = buffer.getSeq(0)

}

== TEST SUITE 
  test("basic test of reservoir sample") {

val rsampling = new ReservoirSampling(10, 10L)
sqlContext.udf.register("reservoir ", rsampling)


val data = sc.parallelize(1 to 1000, 1)
val schema = new StructType(Array(
  StructField("key", StringType, nullable = false)
))

val df = sqlContext
  .createDataFrame(data.map{x:Int => Row(x.toString)}, schema)
  .select("key")
  .withColumn("g", org.apache.spark.sql.functions.lit(10))
  .distinct
  .groupBy("g")
  .agg(rsampling(col("key")).as("keys"))
  .explode("skey", "key"){keys: Seq[String] => keys}
  .select("g", "key")
  .show()

  }




On Fri, Sep 25, 2015 at 9:35 AM, Yin Huai  wrote:

> Hi Ritesh,
>
> Right now, we only allow specific data types defined in the inputSchema.
> Supporting abstract types (e.g. NumericType) may cause the logic of a UDAF
> be more complex. It will be great to understand the use cases first. What
> kinds of possible input data types that you want to support and do you need
> to know the actual argument types to determine how to process input data?
>
> btw, for now, one possible workaround is to define multiple UDAFs for
> different input types. Then, based on arguments that you have, you invoke
> the corresponding UDAF.
>
> Thanks,
>
> Yin
>
> On Fri, Sep 25, 2015 at 8:07 AM, Ritesh Agrawal <
> ragra...@netflix.com.invalid> wrote:
>
>> Hi all,
>>
>> I am trying to learn about UDAF and implemented a simple reservoir sample
>> UDAF. It's working fine. However I am not able to figure out what DataType
>> should I use so that its can deal with all DataTypes (simple and complex).
>> For instance currently I have defined my input schema as
>>
>>  def inputSchema = StructType(StructField("id", StringType) :: Nil )
>>
>>
>> Instead of StringType can I use some other data type that is superclass
>> of all the DataTypes ?
>>
>> Ritesh
>>
>
>


Re: Generic DataType in UDAF

2015-09-25 Thread Yin Huai
Hi Ritesh,

Right now, we only allow specific data types defined in the inputSchema.
Supporting abstract types (e.g. NumericType) may cause the logic of a UDAF
be more complex. It will be great to understand the use cases first. What
kinds of possible input data types that you want to support and do you need
to know the actual argument types to determine how to process input data?

btw, for now, one possible workaround is to define multiple UDAFs for
different input types. Then, based on arguments that you have, you invoke
the corresponding UDAF.

Thanks,

Yin

On Fri, Sep 25, 2015 at 8:07 AM, Ritesh Agrawal <
ragra...@netflix.com.invalid> wrote:

> Hi all,
>
> I am trying to learn about UDAF and implemented a simple reservoir sample
> UDAF. It's working fine. However I am not able to figure out what DataType
> should I use so that its can deal with all DataTypes (simple and complex).
> For instance currently I have defined my input schema as
>
>  def inputSchema = StructType(StructField("id", StringType) :: Nil )
>
>
> Instead of StringType can I use some other data type that is superclass of
> all the DataTypes ?
>
> Ritesh
>


Re: kafka direct streaming with checkpointing

2015-09-25 Thread Cody Koeninger
Storing passbacks transactionally with results in your own data store, with
a schema that makes sense for you, is the optimal solution.

On Fri, Sep 25, 2015 at 11:05 AM, Radu Brumariu  wrote:

> Right, I understand why the exceptions happen.
> However, it seems less useful to have a checkpointing that only works in
> the case of an application restart. IMO, code changes happen quite often,
> and not being able to pick up where the previous job left off is quite a
> bit of a hinderance.
>
> The solutions you mention would partially solve the problem, while
> bringing new problems along ( increased resource utilization, difficulty in
> managing multiple jobs consuming the same data ,etc ).
>
> The solution that we currently employ is committing the offsets to a
> durable storage and making sure that the job reads the offsets from there
> upon restart, while forsaking checkpointing.
>
> The scenario seems not to be an edge case, which is why I was asking that
> perhaps it could be handled by the spark kafka API instead having everyone
> come up with their own, sub-optimal solutions.
>
> Radu
>
> On Fri, Sep 25, 2015 at 5:06 AM, Adrian Tanase  wrote:
>
>> Hi Radu,
>>
>> The problem itself is not checkpointing the data – if your operations are
>> stateless then you are only checkpointing the kafka offsets, you are right.
>> The problem is that you are also checkpointing metadata – including the
>> actual Code and serialized java classes – that’s why you’ll see
>> ser/deser exceptions on restart with upgrade.
>>
>> If you’re not using stateful opetations, you might get away by using the
>> old Kafka receiver w/o WAL – but you accept “at least once semantics”. As
>> soon as you add in the WAL you are forced to checkpoint and you’re better
>> off with the DirectReceiver approach.
>>
>> I believe the simplest way to get around is to support runnning 2
>> versions in parallel – with some app level control of a barrier (e.g. v1
>> reads events up to 3:00am, v2 after that). Manual state management is also
>> supported by the framework but it’s harder to control because:
>>
>>- you’re not guaranteed to shut down gracefully
>>- You may have a bug that prevents the state to be saved and you
>>can’t restart the app w/o upgrade
>>
>> Less than ideal, yes :)
>>
>> -adrian
>>
>> From: Radu Brumariu
>> Date: Friday, September 25, 2015 at 1:31 AM
>> To: Cody Koeninger
>> Cc: "user@spark.apache.org"
>> Subject: Re: kafka direct streaming with checkpointing
>>
>> Would changing the direct stream api to support committing the offsets to
>> kafka's ZK( like a regular consumer) as a fallback mechanism, in case
>> recovering from checkpoint fails , be an accepted solution?
>>
>> On Thursday, September 24, 2015, Cody Koeninger 
>> wrote:
>>
>>> This has been discussed numerous times, TD's response has consistently
>>> been that it's unlikely to be possible
>>>
>>> On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu 
>>> wrote:
>>>
 It seems to me that this scenario that I'm facing, is quite common for
 spark jobs using Kafka.
 Is there a ticket to add this sort of semantics to checkpointing ? Does
 it even make sense to add it there ?

 Thanks,
 Radu


 On Thursday, September 24, 2015, Cody Koeninger 
 wrote:

> No, you cant use checkpointing across code changes.  Either store
> offsets yourself, or start up your new app code and let it catch up before
> killing the old one.
>
> On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu 
> wrote:
>
>> Hi,
>> in my application I use Kafka direct streaming and I have also
>> enabled checkpointing.
>> This seems to work fine if the application is restarted. However if I
>> change the code and resubmit the application, it cannot start because of
>> the checkpointed data being of different class versions.
>> Is there any way I can use checkpointing that can survive across
>> application version changes?
>>
>> Thanks,
>> Radu
>>
>>
>
>>>
>


Re: spark.mesos.coarse impacts memory performance on mesos

2015-09-25 Thread Tim Chen
Hi Utkarsh,

What is your job placement like when you run fine grain mode? You said
coarse grain mode only ran with one node right?

And when the job is running could you open the Spark webui and get stats
about the heap size and other java settings?

Tim

On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar 
wrote:

> Bumping this one up, any suggestions on the stacktrace?
> spark.mesos.coarse=true is not working and the driver crashed with the
> error.
>
> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar 
> wrote:
>
>> Missed to do a reply-all.
>>
>> Tim,
>>
>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse = false
>> works (sorry there was a typo in my last email, I meant "when I do
>> "spark.mesos.coarse=false", the job works like a charm. ").
>>
>> I get this exception with spark.mesos.coarse = true:
>>
>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>> "55af5a61e8a42806f47546c1"}
>>
>> 15/09/22
>> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
>> "55af5a61e8a42806f47546c1"}, max= null
>>
>> Exception
>> in thread "main" java.lang.OutOfMemoryError: Java heap space
>>
>> 
>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> 
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> 
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>
>> 
>> at org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> 
>> at scala.Option.getOrElse(Option.scala:120)
>>
>> 
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>
>> 
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>
>> 
>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>
>> 

Re: Weird worker usage

2015-09-25 Thread Bryan Jeffrey
I am seeing a similar issue when reading from Kafka.  I have a single Kafka
broker with 1 topic and 10 partitions on a separate machine.  I have a
three-node spark cluster, and verified that all workers are registered with
the master.  I'm initializing Kafka using a similar method to this article:
http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/.
I create 3 InputDStreams and union them together to provide a unified
context. I then repartition this to 6 partitions:

val streams = Range(0, configuration.numStreams)
  .map(x => {
 logger.info("Starting Setup of Kafka Stream #" + x + ": \n\tZookeepers: "
+ zookeepersToUse.mkString(",") + "\n\tBrokers: " +
brokersToUse.mkString(",") + "\n\tTopics: " + topicsToUse.mkString(","))
 KafkaStreamFactory.createKafkaStream(ssc, brokersToUse, zookeepersToUse,
topicsToUse)
}).toArray
val unionStream = ssc.union(streams)
if(configuration.threadParallelism > 0) {
  unionStream.repartition(configuration.threadParallelism)
}
unionStream


I am submitting the job to Spark using the following options:

/spark/spark-1.4.1/bin/spark-submit --deploy-mode client --supervise
--master "spark://sparkserver:7077" --conf spark.logConf=true --conf
spark.default.parallelism=6 --conf spark.streaming.receiver.maxRate=500
--class MainClass "/tmp/MainClass-1.0-SNAPSHOT-jar-with-dependencies.jar"
--checkpoint /tmp/sparkcheckpoint --broker kafkaBroker:9092 --topic test
--numStreams 9 --threadParallelism 9

Even when I put a long-running job in the queue, none of the other nodes
are anything but idle.

Am I missing something obvious?

Regards,

Bryan Jeffrey





On Fri, Sep 25, 2015 at 8:28 AM, Akhil Das 
wrote:

> Parallel tasks totally depends on the # of partitions that you are having,
> if you are not receiving sufficient partitions (partitions > total # cores)
> then try to do a .repartition.
>
> Thanks
> Best Regards
>
> On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:
>
>> Hello all,
>>
>> I have a Spark streaming application that reads from a Flume Stream, does
>> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
>> operations before writing the analyzed output to ElasticSearch inside a
>> foreachRDD()...
>>
>> I recently started to run this on a 2 node cluster (Standalone) with the
>> driver program directly submitting to Spark master on the same host. The
>> way I have divided the resources is as follows:
>>
>> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
>> worker)
>> N2: 2 spark workers (16 gb + 8 cores each worker).
>>
>> The application works just fine but it is underusing N2 completely. It
>> seems to use N1 (note that both executors on N1 get used) for all the
>> analytics but when it comes to writing to Elasticsearch, it does divide the
>> data around into all 4 executors which then write to ES on a separate host.
>>
>> I am puzzled as to why the data is not being distributed evenly from the
>> get go into all 4 executors and why would it only do so in the final step
>> of the pipeline which seems counterproductive as well?
>>
>> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>>
>> Any help in getting the resources more evenly utilized on N1 and N2 is
>> welcome.
>>
>> Thanks in advance,
>> Nikunj
>>
>>
>


Re: Receiver and Parallelization

2015-09-25 Thread Adrian Tanase
Good catch, I was not aware of this setting.

I’m wondering though if it also generates a shuffle or if the data is still 
processed by the node on which it’s ingested - so that you’re not gated by the 
number of cores on one machine.

-adrian



On 9/25/15, 5:27 PM, "Silvio Fiorito"  wrote:

>One thing you should look at is your batch duration and 
>spark.streaming.blockInterval
>
>Those 2 things control how many partitions are generated for each RDD (batch) 
>of the DStream when using a receiver (vs direct approach).
>
>So if you have a 2 second batch duration and the default blockInterval of 
>200ms this will create 10 partitions. This means you can have a max of 10 
>parallel tasks (as long as you have the cores available) running at a time for 
>a map-like operation.
>
>
>
>
>On 9/25/15, 9:08 AM, "nib...@free.fr"  wrote:
>
>>Hello,
>>I used a custom receiver in order to receive JMS messages from MQ Servers.
>>I want to benefit of Yarn cluster, my questions are :
>>
>>- Is it possible to have only one node receiving JMS messages and parralelize 
>>the RDD over all the cluster nodes ?
>>- Is it possible to parallelize also the message receiver over cluster nodes ?
>>
>>If you have any code example for the both items it would be fine, because the 
>>parralelization mechanism in the code is not crystal clear for me ...
>>
>>Tks
>>Nicolas
>>
>>-
>>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>For additional commands, e-mail: user-h...@spark.apache.org
>>


Re: Java Heap Space Error

2015-09-25 Thread Yusuf Can Gürkan
Hello,

It worked like a charm. Thank you very much.

Some userid’s were null that’s why many records go to userid ’null’. When i put 
a where clause: userid != ‘null’, it solved problem.

> On 24 Sep 2015, at 22:43, java8964  wrote:
> 
> I can understand why your first query will finish without OOM, but the new 
> one will fail with OOM.
> 
> In the new query, you are asking a groupByKey/cogroup operation, which will 
> force all the productName + prodcutionCatagory per user id sent to the same 
> reducer. This could easily below out reducer's memory if you have one user id 
> having lot of productName and productCatagory.
> 
> Keep in mind that Spark on the reducer side still use a Hash to merge all the 
> data from different mappers, so the memory in the reduce side has to be able 
> to merge all the productionName + productCatagory for the most frequently 
> shown up user id (at least), and I don't know why you want all the 
> productName and productCategory per user Id (Maybe a distinct could be 
> enough?).
> 
> Image you have one user id show up 1M time in your dataset, with 0.5M 
> productname as 'A', and 0.5M product name as 'B', and your query will push 1M 
> of 'A' and 'B' into the same reducer, and ask Spark to merge them in the 
> HashMap for you for that user Id. This will cause OOM.
> 
> Above all, you need to find out what is the max count per user id in your 
> data: select max(count(*)) from land where . group by userid
> 
> Your memory has to support that amount of productName and productCatagory, 
> and if your partition number is not high enough (even as your unique count of 
> user id), if that is really what you want, to consolidate all the 
> productionName and product catagory together, without even consider removing 
> duplication.
> 
> But both query still should push similar records count per partition, but 
> with much of different volume size of data.
> 
> Yong
> 
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com
> Date: Thu, 24 Sep 2015 18:56:51 +0300
> CC: jingyu.zh...@news.com.au; user@spark.apache.org
> To: java8...@hotmail.com
> 
> Yes right, the query you wrote worked in same cluster. In this case, 
> partitions were equally distributed but when i used regex and concetanations 
> it’s not as i said before. Query with concetanation is below:
> 
> val usersInputDF = sqlContext.sql(
>   s"""
>  |  select userid,concat_ws(' ',collect_list(concat_ws(' ',if(productname 
> is not 
> NULL,lower(productname),''),lower(regexp_replace(regexp_replace(substr(productcategory,2,length(productcategory)-2),'\"',''),\",\",'
>  ') inputlist from landing where 
> dt='${dateUtil.getYear}-${dateUtil.getMonth}' and day >= '${day}' and userid 
> != '' and userid is not null and userid is not NULL and pagetype = 
> 'productDetail' group by userid
> 
>""".stripMargin)
> 
> 
> On 24 Sep 2015, at 16:52, java8964  > wrote:
> 
> This is interesting.
> 
> So you mean that query as 
> 
> "select userid from landing where dt='2015-9' and userid != '' and userid is 
> not null and userid is not NULL and pagetype = 'productDetail' group by 
> userid"
> 
> works in your cluster?
> 
> In this case, do you also see this one task with way more data than the rest, 
> as it happened when you use regex and concatenation?
> 
> It is hard to believe that just add "regex" and "concatenation" will make the 
> distribution more equally across partitions. In your query, the distribution 
> in the partitions simply depends on the Hash partitioner of "userid".
> 
> Can you show us the query after you add "regex" and "concatenation"?
> 
> Yong
> 
> Subject: Re: Java Heap Space Error
> From: yu...@useinsider.com 
> Date: Thu, 24 Sep 2015 15:34:48 +0300
> CC: user@spark.apache.org 
> To: jingyu.zh...@news.com.au ; 
> java8...@hotmail.com 
> 
> @Jingyu
> Yes, it works without regex and concatenation as the query below:
> 
> So, what we can understand from this? Because when i do like that, shuffle 
> read sizes are equally distributed between partitions.
> 
> val usersInputDF = sqlContext.sql(
> s"""
>  |  select userid from landing where dt='2015-9' and userid != '' and 
> userid is not null and userid is not NULL and pagetype = 'productDetail' 
> group by userid
> 
>""".stripMargin)
> 
> @java8964
> 
> I tried with sql.shuffle.partitions = 1 but no luck. It’s again one of 
> the partitions shuffle size is huge and the others are very small.
> 
> 
> ——
> So how can i balance this shuffle read size between partitions?
> 
> 
> On 24 Sep 2015, at 03:35, Zhang, Jingyu  > wrote:
> 
> Is you sql works if do not runs a regex on strings and concatenates them, I 
> mean just Select the stuff without String operations?
> 
> On 24 September 2015 at 10:11, java8964  > wrote:

Re: kafka direct streaming with checkpointing

2015-09-25 Thread Radu Brumariu
Right, I understand why the exceptions happen.
However, it seems less useful to have a checkpointing that only works in
the case of an application restart. IMO, code changes happen quite often,
and not being able to pick up where the previous job left off is quite a
bit of a hinderance.

The solutions you mention would partially solve the problem, while bringing
new problems along ( increased resource utilization, difficulty in managing
multiple jobs consuming the same data ,etc ).

The solution that we currently employ is committing the offsets to a
durable storage and making sure that the job reads the offsets from there
upon restart, while forsaking checkpointing.

The scenario seems not to be an edge case, which is why I was asking that
perhaps it could be handled by the spark kafka API instead having everyone
come up with their own, sub-optimal solutions.

Radu

On Fri, Sep 25, 2015 at 5:06 AM, Adrian Tanase  wrote:

> Hi Radu,
>
> The problem itself is not checkpointing the data – if your operations are
> stateless then you are only checkpointing the kafka offsets, you are right.
> The problem is that you are also checkpointing metadata – including the
> actual Code and serialized java classes – that’s why you’ll see ser/deser
> exceptions on restart with upgrade.
>
> If you’re not using stateful opetations, you might get away by using the
> old Kafka receiver w/o WAL – but you accept “at least once semantics”. As
> soon as you add in the WAL you are forced to checkpoint and you’re better
> off with the DirectReceiver approach.
>
> I believe the simplest way to get around is to support runnning 2 versions
> in parallel – with some app level control of a barrier (e.g. v1 reads
> events up to 3:00am, v2 after that). Manual state management is also
> supported by the framework but it’s harder to control because:
>
>- you’re not guaranteed to shut down gracefully
>- You may have a bug that prevents the state to be saved and you can’t
>restart the app w/o upgrade
>
> Less than ideal, yes :)
>
> -adrian
>
> From: Radu Brumariu
> Date: Friday, September 25, 2015 at 1:31 AM
> To: Cody Koeninger
> Cc: "user@spark.apache.org"
> Subject: Re: kafka direct streaming with checkpointing
>
> Would changing the direct stream api to support committing the offsets to
> kafka's ZK( like a regular consumer) as a fallback mechanism, in case
> recovering from checkpoint fails , be an accepted solution?
>
> On Thursday, September 24, 2015, Cody Koeninger 
> wrote:
>
>> This has been discussed numerous times, TD's response has consistently
>> been that it's unlikely to be possible
>>
>> On Thu, Sep 24, 2015 at 12:26 PM, Radu Brumariu  wrote:
>>
>>> It seems to me that this scenario that I'm facing, is quite common for
>>> spark jobs using Kafka.
>>> Is there a ticket to add this sort of semantics to checkpointing ? Does
>>> it even make sense to add it there ?
>>>
>>> Thanks,
>>> Radu
>>>
>>>
>>> On Thursday, September 24, 2015, Cody Koeninger 
>>> wrote:
>>>
 No, you cant use checkpointing across code changes.  Either store
 offsets yourself, or start up your new app code and let it catch up before
 killing the old one.

 On Thu, Sep 24, 2015 at 8:40 AM, Radu Brumariu 
 wrote:

> Hi,
> in my application I use Kafka direct streaming and I have also enabled
> checkpointing.
> This seems to work fine if the application is restarted. However if I
> change the code and resubmit the application, it cannot start because of
> the checkpointed data being of different class versions.
> Is there any way I can use checkpointing that can survive across
> application version changes?
>
> Thanks,
> Radu
>
>

>>


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-25 Thread Sourabh Chandak
Thanks Cody I was able to find out the issue yesterday after sending the
last email.

On Friday, September 25, 2015, Cody Koeninger  wrote:

> So you're still having a problem getting partitions or offsets from kafka
> when creating the stream.  You can try each of those kafka operations
> individually (getPartitions / getLatestLeaderOffsets)
>
> checkErrors should be dealing with an arraybuffer of throwables, not just
> a single one.  Is that the only error you're seeing, or are there more?
>
> You can also modify it to call printStackTrace or whatever on each
> individual error, instead of only printing the message.
>
>
>
>
> On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak  > wrote:
>
>> I was able to get pass this issue. I was pointing the SSL port whereas
>> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
>> am getting the following error:
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> java.nio.BufferUnderflowException
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
>> at scala.util.Either.fold(Either.scala:97)
>> at
>> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
>> at
>> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
>> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
>> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> Thanks,
>> Sourabh
>>
>> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger > > wrote:
>>
>>> That looks like the OOM is in the driver, when getting partition
>>> metadata to create the direct stream.  In that case, executor memory
>>> allocation doesn't matter.
>>>
>>> Allocate more driver memory, or put a profiler on it to see what's
>>> taking up heap.
>>>
>>>
>>>
>>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak >> > wrote:
>>>
 Adding Cody and Sriharsha

 On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak >>> > wrote:

> Hi,
>
> I have ported receiver less spark streaming for kafka to Spark 1.2 and
> am trying to run a spark streaming job to consume data form my broker, but
> I am getting the following error:
>
> 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
> 352518400
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
> at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
> at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
> at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at org.apache.spark.streaming.kafka.KafkaCluster.org
> $apache$spark$stream

Generic DataType in UDAF

2015-09-25 Thread Ritesh Agrawal
Hi all,

I am trying to learn about UDAF and implemented a simple reservoir sample
UDAF. It's working fine. However I am not able to figure out what DataType
should I use so that its can deal with all DataTypes (simple and complex).
For instance currently I have defined my input schema as

 def inputSchema = StructType(StructField("id", StringType) :: Nil )


Instead of StringType can I use some other data type that is superclass of
all the DataTypes ?

Ritesh


Re: --class has to be always specified in spark-submit either it is defined in jar manifest?

2015-09-25 Thread Petr Novak
Either setting it programatically doesn't work:
sparkConf.setIfMissing("class", "...Main")

In my current setting moving main to another package requires to propagate
change to deploy scripts. Doesn't matter I will find some other way. Petr

On Fri, Sep 25, 2015 at 4:40 PM, Petr Novak  wrote:

> Ortherwise it seems it tries to load from a checkpoint which I have
> deleted and cannot be found. Or it should work and I have wrong something
> else. Documentation doesn't mention option with jar manifest, so I assume
> it doesn't work this way.
>
> Many thanks,
> Petr
>


--class has to be always specified in spark-submit either it is defined in jar manifest?

2015-09-25 Thread Petr Novak
Ortherwise it seems it tries to load from a checkpoint which I have deleted
and cannot be found. Or it should work and I have wrong something else.
Documentation doesn't mention option with jar manifest, so I assume it
doesn't work this way.

Many thanks,
Petr


Re: Receiver and Parallelization

2015-09-25 Thread Silvio Fiorito
One thing you should look at is your batch duration and 
spark.streaming.blockInterval

Those 2 things control how many partitions are generated for each RDD (batch) 
of the DStream when using a receiver (vs direct approach).

So if you have a 2 second batch duration and the default blockInterval of 200ms 
this will create 10 partitions. This means you can have a max of 10 parallel 
tasks (as long as you have the cores available) running at a time for a 
map-like operation.




On 9/25/15, 9:08 AM, "nib...@free.fr"  wrote:

>Hello,
>I used a custom receiver in order to receive JMS messages from MQ Servers.
>I want to benefit of Yarn cluster, my questions are :
>
>- Is it possible to have only one node receiving JMS messages and parralelize 
>the RDD over all the cluster nodes ?
>- Is it possible to parallelize also the message receiver over cluster nodes ?
>
>If you have any code example for the both items it would be fine, because the 
>parralelization mechanism in the code is not crystal clear for me ...
>
>Tks
>Nicolas
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: ERROR BoundedByteBufferReceive: OOME with size 352518400

2015-09-25 Thread Cody Koeninger
So you're still having a problem getting partitions or offsets from kafka
when creating the stream.  You can try each of those kafka operations
individually (getPartitions / getLatestLeaderOffsets)

checkErrors should be dealing with an arraybuffer of throwables, not just a
single one.  Is that the only error you're seeing, or are there more?

You can also modify it to call printStackTrace or whatever on each
individual error, instead of only printing the message.




On Thu, Sep 24, 2015 at 5:00 PM, Sourabh Chandak 
wrote:

> I was able to get pass this issue. I was pointing the SSL port whereas
> SimpleConsumer should point to the PLAINTEXT port. But after fixing that I
> am getting the following error:
>
> Exception in thread "main" org.apache.spark.SparkException:
> java.nio.BufferUnderflowException
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
> at scala.util.Either.fold(Either.scala:97)
> at
> org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:309)
> at
> org.ofe.weve.test.KafkaTest$.setupProcessingContext(KafkaTest.scala:36)
> at org.ofe.weve.test.KafkaTest$.main(KafkaTest.scala:59)
> at org.ofe.weve.test.KafkaTest.main(KafkaTest.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
> Sourabh
>
> On Thu, Sep 24, 2015 at 2:04 PM, Cody Koeninger 
> wrote:
>
>> That looks like the OOM is in the driver, when getting partition metadata
>> to create the direct stream.  In that case, executor memory allocation
>> doesn't matter.
>>
>> Allocate more driver memory, or put a profiler on it to see what's taking
>> up heap.
>>
>>
>>
>> On Thu, Sep 24, 2015 at 3:51 PM, Sourabh Chandak 
>> wrote:
>>
>>> Adding Cody and Sriharsha
>>>
>>> On Thu, Sep 24, 2015 at 1:25 PM, Sourabh Chandak 
>>> wrote:
>>>
 Hi,

 I have ported receiver less spark streaming for kafka to Spark 1.2 and
 am trying to run a spark streaming job to consume data form my broker, but
 I am getting the following error:

 15/09/24 20:17:45 ERROR BoundedByteBufferReceive: OOME with size
 352518400
 java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
 at
 kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
 at
 kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
 at
 kafka.network.Receive$class.readCompletely(Transmission.scala:56)
 at
 kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
 at
 kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
 at
 kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:83)
 at
 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:80)
 at kafka.consumer.SimpleConsumer.send(SimpleConsumer.scala:103)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:126)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$getPartitionMetadata$1.apply(KafkaCluster.scala:125)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:346)
 at
 org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
 at org.apache.spark.streaming.kafka.KafkaCluster.org
 $apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
 at
 org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
 at
 org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaC

Re: Receiver and Parallelization

2015-09-25 Thread Adrian Tanase
1) yes, just use .repartition on the inbound stream, this will shuffle data 
across your whole cluster and process in parallel as specified.
2) yes, although I’m not sure how to do it for a totally custom receiver. Does 
this help as a starting point? 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving





On 9/25/15, 4:08 PM, "nib...@free.fr"  wrote:

>Hello,
>I used a custom receiver in order to receive JMS messages from MQ Servers.
>I want to benefit of Yarn cluster, my questions are :
>
>- Is it possible to have only one node receiving JMS messages and parralelize 
>the RDD over all the cluster nodes ?
>- Is it possible to parallelize also the message receiver over cluster nodes ?
>
>If you have any code example for the both items it would be fine, because the 
>parralelization mechanism in the code is not crystal clear for me ...
>
>Tks
>Nicolas
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HDFS is undefined

2015-09-25 Thread Angel Angel
hello,
I am running the spark application.

I have installed the cloudera manager.
it includes the spark version 1.2.0


But now i want to use spark version 1.4.0.

its also working fine.

But when i try to access the HDFS in spark 1.4.0 in eclipse i am getting
the following error.

"Exception in thread "main" java.nio.file.FileSystemNotFoundException:
Provider "hdfs" not installed "


My spark 1.4.0 spark-env.sh file is

export HADOOP_CONF_DIR=/etc/hadoop/conf
export SPARK_HOME=/root/spark-1.4.0


export
DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.3.5-1.cdh5.3.5.p0.4/lib/hadoop

still i am getting the error.

please give me suggestions.

Thanking You,
Sagar Jadhav.


Re: Unreachable dead objects permanently retained on heap

2015-09-25 Thread Saurav Sinha
Hi Spark Users,

I am running some spark jobs which is running every hour.After running for
12 hours master is getting killed giving exception as

*java.lang.OutOfMemoryError: GC overhead limit exceeded*

It look like there is some memory issue in spark master.

Same kind of issue I noticed with spark history server.

In my job I have to monitor if job completed successfully, for that I am
hitting curl to get status but when no of jobs has increased to >80 apps
history server start responding with delay.Like it is taking more then 5
min to respond status of jobs.

Running spark 1.4.1 in standalone mode on 5 machine cluster.

Kindly suggest me solution for memory issue it is blocker.

Thanks,
Saurav Sinha

On Fri, Sep 25, 2015 at 5:01 PM, James Aley  wrote:

> Hi,
>
> We have an application that submits several thousands jobs within the same
> SparkContext, using a thread pool to run about 50 in parallel. We're
> running on YARN using Spark 1.4.1 and seeing a problem where our driver is
> killed by YARN due to running beyond physical memory limits (no Java OOM
> stack trace though).
>
> Plugging in YourKit, I can see that in fact the application is running low
> on heap. The suspicious thing we're seeing is that the old generation is
> filling up with dead objects, which don't seem to be fully removed during
> the stop-the-world sweeps we see happening later in the running of the
> application.
>
> With allocation tracking enabled, I can see that maybe 80%+ of that dead
> heap space consists of byte arrays, which appear to contain some
> snappy-compressed Hadoop configuration data. Many of them are 4MB each,
> other hundreds of KBs. The allocation tracking reveals that they were
> originally allocated in calls to sparkContext.hadoopFile() (from
> AvroRelation in spark-avro). It seems that this data was broadcast to the
> executors as a result of that call? I'm not clear on the implementation
> details, but I can imagine that might be necessary?
>
> This application is essentially a batch job to take many Avro files and
> merging them into larger Parquet files. What it does is builds a DataFrame
> of Avro files, then for each DataFrame, starts a job using
> .coalesce(N).write().parquet() on a fixed size thread pool.
>
> It seems that for each of those calls, another chunk of heap space
> disappears to one of these byte arrays and is never reclaimed. I understand
> that broadcast variables remain in memory on the driver application in
> their serialized form, and that at least appears to be consistent with what
> I'm seeing here. Question is, what can we do about this? Is there a way to
> reclaim this memory? Should those arrays be GC'ed when jobs finish?
>
> Any guidance greatly appreciated.
>
>
> Many thanks,
>
> James.
>



-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Best practices for scheduling Spark jobs on "shared" YARN cluster using Autosys

2015-09-25 Thread unk1102
Hi I have 5 Spark jobs which needs to be run in parallel to speed up process
they take around 6-8 hours together. I have 93 container nodes with 8 cores
each memory capacity of around 2.8 TB. Now I runs each jobs with around 30
executors with 2 cores and 20 GB each. My each jobs processes around 1 TB of
data. Now since my cluster is shared cluster many other teams spawn their
jobs along with me. So YARN kills my executors and not adding it back since
cluster is running at max capacity. I just want to know best practices in
such a resource crunching environment. These jobs runs everyday so I am
looking for innovative approaches to solve this problem. Before anyone says
we can have our own dedicated cluster so looking for alternative solutions.
Please guide. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-scheduling-Spark-jobs-on-shared-YARN-cluster-using-Autosys-tp24820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Receiver and Parallelization

2015-09-25 Thread nibiau
Hello,
I used a custom receiver in order to receive JMS messages from MQ Servers.
I want to benefit of Yarn cluster, my questions are :

- Is it possible to have only one node receiving JMS messages and parralelize 
the RDD over all the cluster nodes ?
- Is it possible to parallelize also the message receiver over cluster nodes ?

If you have any code example for the both items it would be fine, because the 
parralelization mechanism in the code is not crystal clear for me ...

Tks
Nicolas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LogisticRegression models consumes all driver memory

2015-09-25 Thread Eugene Zhulenev
Problem turned out to be in too high 'spark.default.parallelism',
BinaryClassificationMetrics are doing combineByKey which internally shuffle
train dataset. Lower parallelism + cutting train set RDD history with
save/read into parquet solved the problem. Thanks for hint!

On Wed, Sep 23, 2015 at 11:10 PM, DB Tsai  wrote:

> You want to reduce the # of partitions to around the # of executors *
> cores. Since you have so many tasks/partitions which will give a lot of
> pressure on treeReduce in LoR. Let me know if this helps.
>
>
> Sincerely,
>
> DB Tsai
> --
> Blog: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
> 
>
> On Wed, Sep 23, 2015 at 5:39 PM, Eugene Zhulenev <
> eugene.zhule...@gmail.com> wrote:
>
>> ~3000 features, pretty sparse, I think about 200-300 non zero features in
>> each row. We have 100 executors x 8 cores. Number of tasks is pretty big,
>> 30k-70k, can't remember exact number. Training set is a result of pretty
>> big join from multiple data frames, but it's cached. However as I
>> understand Spark still keeps DAG history of RDD to be able to recover it in
>> case of failure of one of the nodes.
>>
>> I'll try tomorrow to save train set as parquet, load it back as DataFrame
>> and run modeling this way.
>>
>> On Wed, Sep 23, 2015 at 7:56 PM, DB Tsai  wrote:
>>
>>> Your code looks correct for me. How many # of features do you have in
>>> this training? How many tasks are running in the job?
>>>
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> --
>>> Blog: https://www.dbtsai.com
>>> PGP Key ID: 0xAF08DF8D
>>> 
>>>
>>> On Wed, Sep 23, 2015 at 4:38 PM, Eugene Zhulenev <
>>> eugene.zhule...@gmail.com> wrote:
>>>
 It's really simple:
 https://gist.github.com/ezhulenev/886517723ca4a353

 The same strange heap behavior we've seen even for single model, it
 takes ~20 gigs heap on a driver to build single model with less than 1
 million rows in input data frame.

 On Wed, Sep 23, 2015 at 6:31 PM, DB Tsai  wrote:

> Could you paste some of your code for diagnosis?
>
>
> Sincerely,
>
> DB Tsai
> --
> Blog: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
> 
>
> On Wed, Sep 23, 2015 at 3:19 PM, Eugene Zhulenev <
> eugene.zhule...@gmail.com> wrote:
>
>> We are running Apache Spark 1.5.0 (latest code from 1.5 branch)
>>
>> We are running 2-3 LogisticRegression models in parallel (we'd love
>> to run 10-20 actually), they are not really big at all, maybe 1-2 million
>> rows in each model.
>>
>> Cluster itself, and all executors look good. Enough free memory and
>> no exceptions or errors.
>>
>> However I see very strange behavior inside Spark driver. Allocated
>> heap constantly growing. It grows up to 30 gigs in 1.5 hours and then
>> everything becomes super sloow.
>>
>> We don't do any collect, and I really don't understand who is
>> consuming all this memory. Looks like it's something inside
>> LogisticRegression itself, however I only see treeAggregate which should
>> not require so much memory to run.
>>
>> Any ideas?
>>
>> Plus I don't see any GC pause, looks like memory is still used by
>> someone inside driver.
>>
>> [image: Inline image 2]
>> [image: Inline image 1]
>>
>
>

>>>
>>
>


Transformation pipeling and parallelism in Spark

2015-09-25 Thread Zhongmiao Li
Hello all,

I have a question regarding the pipelining and parallelism of transformations 
in Spark. I couldn’t find any documentation about it and I would really 
appreciate your help if you could help me with it.  I just started using and 
reading Spark, so I guess my description may not be very clear to you.. Please 
tell me if you don’t understand anything.

Let me use a figure of the Spark paper to help me illustrate the problem.


Firstly, while applying transformations to a single partition of any RDD, is 
there any parallelism? I guess the answer is no, but I will be more assured if 
anyone can confirm it.

Secondly, many transformations in Spark can be pipelined. For example, 
transformations from C to D and from D to F should be pipelined in 
element-granuality, as these partitions are in the same machine. As the Spark 
paper says, ’narrow dependencies allow for pipelined execution on one cluster 
node’. However, does pipelining always work for transformations of narrow 
dependencies? Or involved partitions all have to reside in the same node? And 
is there any limitation for the length of pipelining?

Moreover, considering transformation with wide dependencies like from A to B 
and from F, B to G. They are require shuffling. In the figure, a partition of B 
requires input from all three partitions from A. So do partitions in B only 
start processing after it has received all data from all partitions in A? And 
do partitions of B only output data after the transformation is finished for 
all its keys, or it can output individual results by key for pipelining (to 
join G)?

Thank you in advance,
Zhongmiao

回复: sometimes No event logs found for application using same JavaSparkSQL example

2015-09-25 Thread our...@cnsuning.com
https://issues.apache.org/jira/browse/SPARK-10832






 
发件人: our...@cnsuning.com
发送时间: 2015-09-25 20:36
收件人: user
抄送: 494165115
主题: sometimes No event logs found for application using same JavaSparkSQL 
example
hi all,
   when  using JavaSparkSQL example,the code was submit many times as following:
/home/spark/software/spark/bin/spark-submit --deploy-mode cluster --class 
org.apache.spark.examples.sql.JavaSparkSQL 
hdfs://SuningHadoop2/user/spark/lib/spark-examples-1.4.0-hadoop2.4.0.jar

unfortunately , sometimes completed applications web shows has"No event 
logs found for application",but  a majority of same application is nomal . the 
detail information showed in jira SPARK-10832(SPARK-10832)



sometimes No event logs found for application using same JavaSparkSQL example

2015-09-25 Thread our...@cnsuning.com
hi all,
   when  using JavaSparkSQL example,the code was submit many times as following:
/home/spark/software/spark/bin/spark-submit --deploy-mode cluster --class 
org.apache.spark.examples.sql.JavaSparkSQL 
hdfs://SuningHadoop2/user/spark/lib/spark-examples-1.4.0-hadoop2.4.0.jar

unfortunately , sometimes completed applications web shows has"No event 
logs found for application",but  a majority of same application is nomal . the 
detail information showed in jira SPARK-10832(SPARK-10832)



Re: Setting Spark TMP Directory in Cluster Mode

2015-09-25 Thread Akhil Das
Try with spark.local.dir in the spark-defaults.conf or SPARK_LOCAL_DIR in
the spark-env.sh file.

Thanks
Best Regards

On Fri, Sep 25, 2015 at 2:14 PM, mufy  wrote:

> Faced with an issue where Spark temp files get filled under
> /opt/spark-1.2.1/tmp on the local filesystem on the worker nodes. Which
> parameter/configuration sets that location?
>
> The spark-env.sh file has the folders set as,
>
> export SPARK_HOME=/opt/spark-1.2.1
> export SPARK_WORKER_DIR=$SPARK_HOME/tmp
>
> I could not find any parameter SPARK_TMP_DIR under stock Spark
> documentation. DataStax did talk something about it. Once I know where this
> can be set I'm thinking of pointing that location to an NFS mounted
> location so that more space can be used without having the fear of jobs
> failing due to space running out.
>
>
> I could also see 'java.io.tmpdir' getting set to the below in the
> spark-env.sh.
>
> -Djava.io.tmpdir=/opt/spark-1.2.1/tmp/spark-tmp
>
> Tried setting it as,
>
> export _JAVA_OPTIONS=-Djava.io.tmpdir=/new/tmp/dir
>
>
> Did a grep again to see if it has picked up. It should have shown
> something like,
>
> Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/new/tmp/dir
>
>
> in the
> /opt/spark-1.2.1/logs/spark-hdplab-org.apache.spark.deploy.master.Master-1-node-01.out
> log. But is still seen pointing to /opt/spark-1.2.1/tmp only.
>
> $ export _JAVA_OPTIONS=-Djava.io.tmpdir=/new/tmp/dir
>
> $ grep -iR "/opt/spark-1.2.1/tmp" /opt/spark-1.2.1/*
> /opt/spark-1.2.1/logs/spark-hdplab-org.apache.spark.deploy.master.Master-1-node-01.out:Picked
> up _JAVA_OPTIONS: -Djava.io.tmpdir=/opt/spark-1.2.1/tmp/spark-tmp
>
>
> Also, I was wondering if setting setting SPARK_DAEMON_JAVA_OPTS which is
> used by Spark to set JVM options will help here even though
> http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
>  talks
> about this in a Spark standalone context.
>
> ---
> Mufeed Usman
> My LinkedIn  | My
> Social Cause  | My Blogs : LiveJournal
> 
>
>
>
>


Re: Error: Asked to remove non-existent executor

2015-09-25 Thread Akhil Das
What you mean by you are behind a NAT? Does it mean you are submitting your
jobs to a remote spark cluster from your local machine? If that's the case
then you need to take care of few ports (in the NAT)
http://spark.apache.org/docs/latest/configuration.html#networking which
assume random as default like spark.driver.port, spark.driver.host etc.
Also you'd have to set those in the SparkConf properly.

Thanks
Best Regards

On Fri, Sep 25, 2015 at 2:05 PM, Tracewski, Lukasz <
lukasz.tracew...@credit-suisse.com> wrote:

> Hi,
>
>
>
> I am trying to submit a job on Spark 1.4 (with Spark Master):
>
>
>
> bin/spark-submit --master spark://:7077 --driver-memory 4g
> --executor-memory 4G  --executor-cores 4 --num-executors 1
> spark/examples/src/main/python/pi.py 6
>
>
>
> which returns:
>
>
>
> ERROR SparkDeploySchedulerBackend: Asked to remove non-existent executor
> 
>
>
>
> On Spark Master I see that the job is indeed submitted. Running locally
> goes fine.
>
>
>
> I am behind NAT. From what I read it might be that the Master cannot find
> Worker nodes. Could it be the case? Any ideas how to resolve it?
>
>
>
> Cheers,
>
> Lucas
>
>
>
> ==
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==
>


Re: Weird worker usage

2015-09-25 Thread Akhil Das
Parallel tasks totally depends on the # of partitions that you are having,
if you are not receiving sufficient partitions (partitions > total # cores)
then try to do a .repartition.

Thanks
Best Regards

On Fri, Sep 25, 2015 at 1:44 PM, N B  wrote:

> Hello all,
>
> I have a Spark streaming application that reads from a Flume Stream, does
> quite a few maps/filters in addition to a few reduceByKeyAndWindow and join
> operations before writing the analyzed output to ElasticSearch inside a
> foreachRDD()...
>
> I recently started to run this on a 2 node cluster (Standalone) with the
> driver program directly submitting to Spark master on the same host. The
> way I have divided the resources is as follows:
>
> N1: spark Master + driver + flume + 2 spark workers (16gb + 6 cores each
> worker)
> N2: 2 spark workers (16 gb + 8 cores each worker).
>
> The application works just fine but it is underusing N2 completely. It
> seems to use N1 (note that both executors on N1 get used) for all the
> analytics but when it comes to writing to Elasticsearch, it does divide the
> data around into all 4 executors which then write to ES on a separate host.
>
> I am puzzled as to why the data is not being distributed evenly from the
> get go into all 4 executors and why would it only do so in the final step
> of the pipeline which seems counterproductive as well?
>
> CPU usage on N1 is near the peak while on N2 is < 10% of overall capacity.
>
> Any help in getting the resources more evenly utilized on N1 and N2 is
> welcome.
>
> Thanks in advance,
> Nikunj
>
>


Re: How to set spark envoirnment variable SPARK_LOCAL_IP in conf/spark-env.sh

2015-09-25 Thread Zhiliang Zhu
 


 On Friday, September 25, 2015 7:46 PM, Zhiliang Zhu  
wrote:
   

 Hi all,
The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just 
set asexport  SPARK_LOCAL_IP=localhost    #or set as the specific node ip on 
the specific spark install directory 

It will work well to submit spark job on master node of cluster, however, it 
will fail by way of some gateway machine remotely.
The gateway machine is already configed, it works well to submit hadoop job.It 
is set as:
export SCALA_HOME=/usr/lib/scala
export JAVA_HOME=/usr/java/jdk1.7.0_45
export R_HOME=/usr/lib/r
export HADOOP_HOME=/usr/lib/hadoop
export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

export SPARK_MASTER_IP=master01
#export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set, SparkContext 
will not start
export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is 
started, but failed later
export SPARK_LOCAL_DIRS=/data/spark_local_dir
...

The error messages:   <---   it is for  SPARK_LOCAL_IP=localhost 
15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' 
on port 48133.
15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be 
reachable.
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...

 I shall sincerely appreciate your kind help very much!Zhiliang



  

Re: Using Spark for portfolio manager app

2015-09-25 Thread Adrian Tanase
Just use the official connector from DataStax 
https://github.com/datastax/spark-cassandra-connector

Your solution is very similar. Let’s assume the state is

case class UserState(amount: Int, updates: Seq[Int])

And your user has 100 - If your user does not see an update, you can emit

Some(UserState(100, Seq.empty))

Otherwise maybe you can emit

Some(UserState(130, List(50, -20)))

You can then process the updates like this

usersState.filter(_.updates.length > 0).foreachRdd { ... }

Regarding optimizations, I would not worry too much about it. Going through 
users with no updates is most likely a no-op. Spark HAS to iterate through all 
the state objects since it does not operate with deltas from one batch to the 
next – the StateDStream is really the whole app state packed as a RDD.
You could look at one of the other updateStateByKey methods – maybe you can 
write more efficient code there:

def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
  ): DStream[(K, S)] = …

What you can do though (and here you’ll be glad that spark also executes the 
code for state objects w/o updates) is cleanup users if they haven’t received 
updates for a long time, then load the state from DB the next time you see 
them. I would consider this a must-have optimization to keep some bounds on the 
memory needs.

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 2:05 PM
To: Adrian Tanase
Subject: Re: Using Spark for portfolio manager app


Hi Adrian,

Thanks Cassandra seems to be good candidate too. I will give it a try.
Do you know any stable connector that help Spark work with Cassandra? Or I 
should write it myself.

Regards my second question, i think i figuring the another solution, i will 
append another flag ( like isNew) to the tupe in updateStateByKey function. 
Then using filter to know which record i should update to database.
But it would be great if you could share your solution too( i don't quite get 
the idea of emitting new tupe).

In addition to this, for Spark design, seems it have to iterate to all key( 
includes one that not change) to do aggregation for each batch. For my use 
cases i have 3M keys, but only 2-3K change for each batch ( every 1 second) is 
there any way to optimize this process?

On Sep 25, 2015 4:12 PM, "Adrian Tanase" 
mailto:atan...@adobe.com>> wrote:
Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful 
as Hbase, a lot easier to setup and manage. Well suited for this type of 
usecase, with a combination of K/V store and time series data.

For the second question, I’ve used this pattern all the time for “flash 
messages” - passing info as a 1 time message downstream:

  *   In your updateStateByKey function, emit a tuple of (actualNewState, 
changedData)
  *   Then filter this on !changedData.isEmpty or something
  *   And only do foreachRdd on the filtered stream.

Makes sense?

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 10:31 AM
To: ALEX K
Cc: "user@spark.apache.org"
Subject: Re: Using Spark for portfolio manager app


Thanks all for the feedback so far.
I havn't decided which external storage will be used yet.
HBase is cool but it requires Hadoop in production. I only have 3-4 servers for 
the whole things ( i am thinking of a relational database for this, can be 
MariaDB, Memsql or mysql) but they are hard to scale.
I will try various appoaches before making any decision.

In addition, using Spark Streaming is there any way to update only new data to 
external storage after using updateStateByKey?
The foreachRDD function seems to loop over all RDDs( includes one that havent 
changed) i believe Spark streamming must has a way to do it, but i still 
couldn't find an example doing similar job.


Spark task error

2015-09-25 Thread madhvi.gupta

Hi,

My configurations are follows:

SPARK_EXECUTOR_INSTANCES=4
SPARK_EXECUTOR_MEMORY=1G

But on my spark UI it shows:

 * *Alive Workers:*1
 * *Cores in use:*4 Total, 0 Used
 * *Memory in use:*6.7 GB Total, 0.0 B Used


Also while running a program in java for spark I am getting the 
following error:
15/09/25 10:35:02 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 
0, 192.168.0.105): java.lang.IllegalStateException: unread block data
at 
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at 
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

I am not getting what all is happening.Can anyone help?

--
Thanks and Regards
Madhvi Gupta



Re: executor-cores setting does not work under Yarn

2015-09-25 Thread Akhil Das
Which version of spark are you having? Can you also check whats set in your
conf/spark-defaults.conf file?

Thanks
Best Regards

On Fri, Sep 25, 2015 at 1:58 AM, Gavin Yue  wrote:

> Running Spark app over Yarn 2.7
>
> Here is my sparksubmit setting:
> --master yarn-cluster \
>  --num-executors 100 \
>  --executor-cores 3 \
>  --executor-memory 20g \
>  --driver-memory 20g \
>  --driver-cores 2 \
>
> But the executor cores setting is not working. It always assigns only one
> vcore  to one container based on the cluster metrics from yarn resource
> manager website.
>
> And yarn setting for container is
> min:   max: 
>
> I have tried to change num-executors and executor memory. It even ignores
> the min cCores setting and always assign one core per container.
>
> Any advice?
>
> Thank you!
>
>
>
>
>


How to set spark envoirnment variable SPARK_LOCAL_IP in conf/spark-env.sh

2015-09-25 Thread Zhiliang Zhu
Hi all,
The spark job will run on yarn. While I do not set SPARK_LOCAL_IP any, or just 
set asexport  SPARK_LOCAL_IP=localhost    #or set as the specific node ip on 
the specific spark install directory 

It will work well to submit spark job on master node of cluster, however, it 
will fail by way of some gateway machine remotely.
The gateway machine is already configed, it works well to submit hadoop job.It 
is set as:
export SCALA_HOME=/usr/lib/scala
export JAVA_HOME=/usr/java/jdk1.7.0_45
export R_HOME=/usr/lib/r
export HADOOP_HOME=/usr/lib/hadoop
export YARN_CONF_DIR=/usr/lib/hadoop/etc/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

export SPARK_MASTER_IP=master01
#export SPARK_LOCAL_IP=master01  #if no SPARK_LOCAL_IP is set, SparkContext 
will not start
export SPARK_LOCAL_IP=localhost #if localhost is set, SparkContext is 
started, but failed later
export SPARK_LOCAL_DIRS=/data/spark_local_dir
...

The error messages:
15/09/25 19:07:12 INFO util.Utils: Successfully started service 'sparkYarnAM' 
on port 48133.
15/09/25 19:07:12 INFO yarn.ApplicationMaster: Waiting for Spark driver to be 
reachable.
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...
15/09/25 19:07:12 ERROR yarn.ApplicationMaster: Failed to connect to driver at 
127.0.0.1:35706, retrying ...

 I shall sincerely appreciate your kind help very much!Zhiliang



Unreachable dead objects permanently retained on heap

2015-09-25 Thread James Aley
Hi,

We have an application that submits several thousands jobs within the same
SparkContext, using a thread pool to run about 50 in parallel. We're
running on YARN using Spark 1.4.1 and seeing a problem where our driver is
killed by YARN due to running beyond physical memory limits (no Java OOM
stack trace though).

Plugging in YourKit, I can see that in fact the application is running low
on heap. The suspicious thing we're seeing is that the old generation is
filling up with dead objects, which don't seem to be fully removed during
the stop-the-world sweeps we see happening later in the running of the
application.

With allocation tracking enabled, I can see that maybe 80%+ of that dead
heap space consists of byte arrays, which appear to contain some
snappy-compressed Hadoop configuration data. Many of them are 4MB each,
other hundreds of KBs. The allocation tracking reveals that they were
originally allocated in calls to sparkContext.hadoopFile() (from
AvroRelation in spark-avro). It seems that this data was broadcast to the
executors as a result of that call? I'm not clear on the implementation
details, but I can imagine that might be necessary?

This application is essentially a batch job to take many Avro files and
merging them into larger Parquet files. What it does is builds a DataFrame
of Avro files, then for each DataFrame, starts a job using
.coalesce(N).write().parquet() on a fixed size thread pool.

It seems that for each of those calls, another chunk of heap space
disappears to one of these byte arrays and is never reclaimed. I understand
that broadcast variables remain in memory on the driver application in
their serialized form, and that at least appears to be consistent with what
I'm seeing here. Question is, what can we do about this? Is there a way to
reclaim this memory? Should those arrays be GC'ed when jobs finish?

Any guidance greatly appreciated.


Many thanks,

James.


Troubles interacting with different version of Hive metastore

2015-09-25 Thread Ferran Galí
Hello,

I'm trying to start the SparkSQL thriftserver over YARN, connecting it to
the 1.1.0-cdh5.4.3 hive metastore that we already have in production.

I downloaded the latest version of Spark (1.5.0), I just followed the
instructions from the documentation
,
and I configured the spark-default.conf file with the following parameters:

*spark.sql.hive.metastore.version* 1.1.0
*spark.sql.hive.metastore.jars*
 
{path-to-hive-install}/lib/*:{path-to-hadoop-install}/common/*:{path-to-hadoop-install}{path-to-hadoop-install}/hdfs/*:{path-to-hadoop-install}/hdfs/lib/*:{path-to-hadoop-install}/yarn/*:{path-to-hadoop-install}/yarn/lib/*:{path-to-hadoop-install}/mapreduce/*:{path-to-hadoop-install}/mapreduce/lib/*


Moreover, I've also placed the *hive-site.xml* into the *conf* folder.
When I startup the thriftserver, it eventually crashes with the following
message:

15/09/25 12:50:57 INFO hive.HiveContext: default warehouse location is
/user/hive/warehouse
15/09/25 12:50:57 INFO hive.HiveContext: Initializing
HiveMetastoreConnection version 1.1.0 using [Ljava.net.URL;@51fd1e7f
15/09/25 12:50:58 INFO client.ClientWrapper: Inspected Hadoop version:
2.6.0-cdh5.4.3
Exception in thread "main" java.lang.ClassNotFoundException:
java.lang.NoClassDefFoundError: com/google/common/base/Predicate when
creating Hive client using classpath: *HERE ALL THE JARS INSIDE THE
**spark.sql.hive.metastore.jars
(including guava-14.0.1.jar)*


If I put "maven" inside the *spark.sql.hive.metastore.jars* parameter I get
the same error.

Do you know what could I be doing wrong? I don't understand, because the
class that he's complaining (com.google.common.base.Predicate) it's already
in the classpath.

Yours,
Ferran Galí i Reniu
-- 
[image: Trovit] 
[image: Twitter] [image: Facebook]
[image: Linkedin]
[image: Google +]
[image: Blog]

*Ferran Galí i Reniu*
Pipeline
+34 93 209 2556
Avda. Diagonal 601, 9ª
08028, Barcelona


Re: how to submit the spark job outside the cluster

2015-09-25 Thread Zhiliang Zhu
It seems that is due to spark  SPARK_LOCAL_IP setting.export 
SPARK_LOCAL_IP=localhost 
will not work.
Then, how it would be set.
Thank you all~~ 
 


 On Friday, September 25, 2015 5:57 PM, Zhiliang Zhu 
 wrote:
   

 Hi Steve,
Thanks a lot for your reply.
That is, some commands could work on the remote server gateway installed , but 
some other commands will not work.As expected, the remote machine is not in the 
same area network as the cluster, and the cluster's portis forbidden.
While I make the remote machine gateway for another local area cluster, it 
works fine, and the hadoopjob could be submitted on the machine remotedly.
However, I want to submit spark jobs remotely as hadoop jobs do In the 
gateway machine, I also copied the spark install directory from the cluster to 
it, conf/spark-env.shis also there. But I fail to submit spark job 
remotely...The error messages:
15/09/25 17:47:47 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/09/25 17:47:47 INFO Remoting: Starting remoting
15/09/25 17:47:48 ERROR netty.NettyTransport: failed to bind to 
/220.250.64.225:0, shutting down Netty transport
15/09/25 17:47:48 WARN util.Utils: Service 'sparkDriver' could not bind on port 
0. Attempting port 1.
15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.

...
Would you help some about it ...
Thank you very much!Zhiliang 

 


 On Friday, September 25, 2015 5:21 PM, Steve Loughran 
 wrote:
   

 

On 25 Sep 2015, at 05:25, Zhiliang Zhu  wrote:

However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at 
the remote machine with gateway, 



which means the namenode is reachable; all those commands only need to interact 
with it.

but commands "hadoop fs -cat/-put XXX    YYY" would not work with error message 
as below:
put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 
nodes instead of minReplication (=1).  There are 2 datanode(s) running and 2 
node(s) are excluded in this operation.
15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010]


the client can't reach the datanodes

   

  

Re: Why Checkpoint is throwing "actor.OneForOneStrategy: NullPointerException"

2015-09-25 Thread Uthayan Suthakar
Thank you Tathagata and Therry for your response. You guys were absolutely
correct that I created a dummy Dstream (to prevent Flume channel filling
up)  and counted the messages but I didn't output(print), hence is why it
reported that error. Since I called print(), the error is no longer is
being thrown.

Cheers,

Uthay

On 25 September 2015 at 03:40, Terry Hoo  wrote:

> I met this before: in my program, some DStreams are not initialized since
> they are not in the path of  of output.
>
> You can  check if you are the same case.
>
>
> Thanks!
> - Terry
>
> On Fri, Sep 25, 2015 at 10:22 AM, Tathagata Das 
> wrote:
>
>> Are you by any chance setting DStream.remember() with null?
>>
>> On Thu, Sep 24, 2015 at 5:02 PM, Uthayan Suthakar <
>> uthayan.sutha...@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> My Stream job is throwing below exception at every interval. It is first
>>> deleting the the checkpoint file and then it's trying to checkpoint, is
>>> this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause
>>> this issue?
>>>
>>> 15/09/24 16:35:55 INFO scheduler.TaskSetManager: Finished task 1.0 in
>>> stage 84.0 (TID 799) in 12 ms on itrac1511.cern.ch (1/8)
>>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter:
>>> Deleting 
>>> hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/checkpoint-144310422*
>>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter: Checkpoint for time
>>> 144310422 ms saved to file
>>> 'hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/*
>>> checkpoint-144310422', took 10696 bytes and 108 ms
>>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Clearing checkpoint data
>>> for time 144310422 ms
>>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Cleared checkpoint data
>>> for time 144310422 ms
>>> 15/09/24 16:35:55 ERROR actor.OneForOneStrategy:
>>> java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
>>> at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>> at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>> at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>> at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>> at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:168)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:279)
>>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>>> at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> Cheers,
>>>
>>> Uthay
>>>
>>>
>>
>


Re: Checkpoint files are saved before stream is saved to file (rdd.toDF().write ...)?

2015-09-25 Thread Petr Novak
Many thanks Cody, it explains quite a bit.

I had couple of problems with checkpointing and graceful shutdown moving
from working code in Spark 1.3.0 to 1.5.0. Having InterruptedExceptions,
KafkaDirectStream couldn't initialize, some exceptions regarding WAL even
I'm using direct stream. Meanwhile I did some major code refactorings and
suddenly it seems to be working same as in 1.3.0, without knowing what
actually I did to solve it. But I'm going to put on a side for now as far
as it work as it is now because I plan to write my own recovery at some
point.

Petr

On Fri, Sep 25, 2015 at 12:14 PM, Petr Novak  wrote:

> Many thanks Cody, it explains quite a bit.
>
> I had couple of problems with checkpointing and graceful shutdown moving
> from working code in Spark 1.3.0 to 1.5.0. Having InterruptedExceptions,
> KafkaDirectStream couldn't initialize, some exceptions regarding WAL even
> I'm using direct stream. Meanwhile I did some major code refactorings and
> suddenly it seems to be working same as in 1.3.0, without knowing what
> actually I did to solve it. But I'm going to put on a side for now as far
> as it work as it is now because I plan to write my own recovery at some
> point.
>
> Petr
>
> On Wed, Sep 23, 2015 at 4:26 PM, Cody Koeninger 
> wrote:
>
>> TD can correct me on this, but I believe checkpointing is done after a
>> set of jobs is submitted, not after they are completed.  If you fail while
>> processing the jobs, starting over from that checkpoint should put you in
>> the correct state.
>>
>> In any case, are you actually observing a loss of messages when killing /
>> restarting a job?
>>
>> On Wed, Sep 23, 2015 at 3:49 AM, Petr Novak  wrote:
>>
>>> Hi,
>>> I have 2 streams and checkpointing with code based on documentation. One
>>> stream is transforming data from Kafka and saves them to Parquet file. The
>>> other stream uses the same stream and does updateStateByKey to compute some
>>> aggregations. There is no gracefulShutdown.
>>>
>>> Both use about this code to save files:
>>>
>>> stream.foreachRDD { (rdd, time) =>
>>>   ...
>>>   rdd.toDF().write.save(...use time for the directory name...)
>>> }
>>>
>>> It is not idempotent at the moment but let's put this aside for now.
>>>
>>> The strange thing is that when I Ctrl+C the job I can see checkpoint
>>> file with timestamp for the last batch but there are no stream
>>> files/directories for this timestamp or only one of streams have data saved
>>> with time aligned with the last checkpoint file. I would expect that
>>> checkpoint file is created after both streams successfully finishes its
>>> saves and that it is created at the end of the batch. Otherwise I don't
>>> know for what checkpointing is good for except maybe cutting lineage. Is
>>> file saving asynchronous and Spark checkpointing does not care about it?
>>>
>>> I actually need to checkpoint both streams atomically at the end of the
>>> batch. It seems to me that Spark checkpoiting facility is quite unusable in
>>> practice except for some simple scenarios and everybody has to actually
>>> roll its own.
>>>
>>> Am I wrong? How can I use Spark checkpoiting to checkpoint both streams
>>> after they successfully save its results to files. It is actually the
>>> reason while I think that micro-batch streaming is nice because it has
>>> clearly defined synchronization barrier. But it doesn't seems like
>>> checkpointing takes an advantage of it.
>>>
>>> I can't ensure atomicity when saving more files for more streams and it
>>> would require some further cleanup code on job restart. But at least I
>>> would like to have a quarantee where existence of checkpoint file signals
>>> that batch with that timestamp finished successfully with all its RDD
>>> actions.
>>>
>>> Or it is expected to behave like this and I have something wrong with my
>>> code?
>>>
>>> Many thanks for any insights,
>>> Petr
>>>
>>>
>>
>


Re: how to submit the spark job outside the cluster

2015-09-25 Thread Zhiliang Zhu
Hi Steve,
Thanks a lot for your reply.
That is, some commands could work on the remote server gateway installed , but 
some other commands will not work.As expected, the remote machine is not in the 
same area network as the cluster, and the cluster's portis forbidden.
While I make the remote machine gateway for another local area cluster, it 
works fine, and the hadoopjob could be submitted on the machine remotedly.
However, I want to submit spark jobs remotely as hadoop jobs do In the 
gateway machine, I also copied the spark install directory from the cluster to 
it, conf/spark-env.shis also there. But I fail to submit spark job 
remotely...The error messages:
15/09/25 17:47:47 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/09/25 17:47:47 INFO Remoting: Starting remoting
15/09/25 17:47:48 ERROR netty.NettyTransport: failed to bind to 
/220.250.64.225:0, shutting down Netty transport
15/09/25 17:47:48 WARN util.Utils: Service 'sparkDriver' could not bind on port 
0. Attempting port 1.
15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
15/09/25 17:47:48 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.

...
Would you help some about it ...
Thank you very much!Zhiliang 

 


 On Friday, September 25, 2015 5:21 PM, Steve Loughran 
 wrote:
   

 

On 25 Sep 2015, at 05:25, Zhiliang Zhu  wrote:

However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at 
the remote machine with gateway, 



which means the namenode is reachable; all those commands only need to interact 
with it.

but commands "hadoop fs -cat/-put XXX    YYY" would not work with error message 
as below:
put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 
nodes instead of minReplication (=1).  There are 2 datanode(s) running and 2 
node(s) are excluded in this operation.
15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010]


the client can't reach the datanodes

  

Re: how to submit the spark job outside the cluster

2015-09-25 Thread Steve Loughran

On 25 Sep 2015, at 05:25, Zhiliang Zhu 
mailto:zchl.j...@yahoo.com.INVALID>> wrote:


However, I just could use "hadoop fs -ls/-mkdir/-rm XXX" commands to operate at 
the remote machine with gateway,


which means the namenode is reachable; all those commands only need to interact 
with it.

but commands "hadoop fs -cat/-put XXXYYY" would not work with error message 
as below:

put: File /user/zhuzl/wordcount/input/1._COPYING_ could only be replicated to 0 
nodes instead of minReplication (=1).  There are 2 datanode(s) running and 2 
node(s) are excluded in this operation.
15/09/25 10:44:00 INFO hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while 
waiting for channel to be ready for connect. ch : 
java.nio.channels.SocketChannel[connection-pending remote=/10.6.28.96:50010]


the client can't reach the datanodes


Re: Exception on save s3n file (1.4.1, hadoop 2.6)

2015-09-25 Thread Steve Loughran

On 25 Sep 2015, at 03:35, Zhang, Jingyu 
mailto:jingyu.zh...@news.com.au>> wrote:


I got following exception when I run  
JavPairRDD.values().saveAsTextFile("s3n://bucket); Can anyone help me out? 
thanks


15/09/25 12:24:32 INFO SparkContext: Successfully stopped SparkContext

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/jets3t/service/ServiceException


you need the same jets3t JAR which ships with hadoop 2.6 (jet3t 0.9.0) on your 
classpath; use --jars


at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)

at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at 
org.apache.spark.SparkHadoopWriter$.createPathFromString(SparkHadoopWriter.scala:170)




Re: Using Spark for portfolio manager app

2015-09-25 Thread Adrian Tanase
Re: DB I strongly encourage you to look at Cassandra – it’s almost as powerful 
as Hbase, a lot easier to setup and manage. Well suited for this type of 
usecase, with a combination of K/V store and time series data.

For the second question, I’ve used this pattern all the time for “flash 
messages” - passing info as a 1 time message downstream:

  *   In your updateStateByKey function, emit a tuple of (actualNewState, 
changedData)
  *   Then filter this on !changedData.isEmpty or something
  *   And only do foreachRdd on the filtered stream.

Makes sense?

-adrian

From: Thúy Hằng Lê
Date: Friday, September 25, 2015 at 10:31 AM
To: ALEX K
Cc: "user@spark.apache.org"
Subject: Re: Using Spark for portfolio manager app


Thanks all for the feedback so far.
I havn't decided which external storage will be used yet.
HBase is cool but it requires Hadoop in production. I only have 3-4 servers for 
the whole things ( i am thinking of a relational database for this, can be 
MariaDB, Memsql or mysql) but they are hard to scale.
I will try various appoaches before making any decision.

In addition, using Spark Streaming is there any way to update only new data to 
external storage after using updateStateByKey?
The foreachRDD function seems to loop over all RDDs( includes one that havent 
changed) i believe Spark streamming must has a way to do it, but i still 
couldn't find an example doing similar job.


  1   2   >