Re: Running Spark in Yarn-client mode

2015-10-08 Thread Sushrut Ikhar
Hey Jean,
Thanks for the quick response. I am using spark 1.4.1 pre-built with hadoop
2.6.
Yes the Yarn cluster has multiple running worker nodes.
It would a great help if you can tell how to look for the executors logs.

Regards,

Sushrut Ikhar
[image: https://]about.me/sushrutikhar



On Thu, Oct 8, 2015 at 11:22 AM, Jean-Baptiste Onofré 
wrote:

> Hi Sushrut,
>
> which packaging of Spark do you use ?
> Do you have a working Yarn cluster (with at least one worker) ?
>
> spark-hadoop-x ?
>
> Regards
> JB
>
> On 10/08/2015 07:23 AM, Sushrut Ikhar wrote:
>
>> Hi,
>> I am new to Spark and I have been trying to run Spark in yarn-client mode.
>>
>> I get this error in yarn logs :
>> Error: Could not find or load main class
>> org.apache.spark.executor.CoarseGrainedExecutorBackend
>>
>> Also, I keep getting these warnings:
>>
>> WARN YarnScheduler: Initial job has not accepted any resources; check
>> your cluster UI to ensure that workers are registered and have
>> sufficient resources
>>
>> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster has
>> disassociated
>>
>> WARN ReliableDeliverySupervisor: Association with remote system
>> [akka.tcp://sparkYarnAM@] has failed, address is now
>> gated for [5000] ms. Reason is: [Disassociated].
>>
>> I believe that executors are starting but are unable to connect back to
>> the driver.
>> How do I resolve this?
>> Also, I need help in locating the driver and executor node logs.
>>
>> Thanks.
>>
>> Regards,
>>
>> Sushrut Ikhar
>> https://about.me/sushrutikhar
>>
>> 
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Build Failure

2015-10-08 Thread shahid qadri
hi 

I tried to build latest master branch of spark
build/mvn -DskipTests clean package


Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [03:46 min]
[INFO] Spark Project Test Tags  SUCCESS [01:02 min]
[INFO] Spark Project Launcher . SUCCESS [01:03 min]
[INFO] Spark Project Networking ... SUCCESS [ 30.794 s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [ 29.496 s]
[INFO] Spark Project Unsafe ... SUCCESS [ 18.478 s]
[INFO] Spark Project Core . SUCCESS [05:42 min]
[INFO] Spark Project Bagel  SUCCESS [  6.082 s]
[INFO] Spark Project GraphX ... SUCCESS [ 23.478 s]
[INFO] Spark Project Streaming  SUCCESS [ 53.969 s]
[INFO] Spark Project Catalyst . SUCCESS [02:12 min]
[INFO] Spark Project SQL .. SUCCESS [03:02 min]
[INFO] Spark Project ML Library ... SUCCESS [02:57 min]
[INFO] Spark Project Tools  SUCCESS [  3.139 s]
[INFO] Spark Project Hive . SUCCESS [03:25 min]
[INFO] Spark Project REPL . SUCCESS [ 18.303 s]
[INFO] Spark Project Assembly . SUCCESS [01:40 min]
[INFO] Spark Project External Twitter . SUCCESS [ 16.707 s]
[INFO] Spark Project External Flume Sink .. SUCCESS [ 52.234 s]
[INFO] Spark Project External Flume ... SUCCESS [ 13.069 s]
[INFO] Spark Project External Flume Assembly .. SUCCESS [  4.653 s]
[INFO] Spark Project External MQTT  SUCCESS [01:56 min]
[INFO] Spark Project External MQTT Assembly ... SUCCESS [ 15.233 s]
[INFO] Spark Project External ZeroMQ .. SUCCESS [ 13.267 s]
[INFO] Spark Project External Kafka ... SUCCESS [ 41.663 s]
[INFO] Spark Project Examples . FAILURE [07:36 min]
[INFO] Spark Project External Kafka Assembly .. SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 40:07 min
[INFO] Finished at: 2015-10-08T13:14:31+05:30
[INFO] Final Memory: 373M/1205M
[INFO] 
[ERROR] Failed to execute goal on project spark-examples_2.10: Could not 
resolve dependencies for project 
org.apache.spark:spark-examples_2.10:jar:1.6.0-SNAPSHOT: The following 
artifacts could not be resolved: com.twitter:algebird-core_2.10:jar:0.9.0, 
com.github.stephenc:jamm:jar:0.2.5: Could not transfer artifact 
com.twitter:algebird-core_2.10:jar:0.9.0 from/to central 
(https://repo1.maven.org/maven2): GET request of: 
com/twitter/algebird-core_2.10/0.9.0/algebird-core_2.10-0.9.0.jar from central 
failed: Connection reset -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn  -rf :spark-examples_2.10
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Build Failure

2015-10-08 Thread Jean-Baptiste Onofré

Hi,

I just tried and it works for me (I don't have any Maven mirror on my 
subnet).


Can you try again ? Maybe it was a temporary issue to access to Maven 
central.


The artifact is present on central:

http://repo1.maven.org/maven2/com/twitter/algebird-core_2.10/0.9.0/

Regards
JB

On 10/08/2015 09:55 AM, shahid qadri wrote:

hi

I tried to build latest master branch of spark
build/mvn -DskipTests clean package


Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [03:46 min]
[INFO] Spark Project Test Tags  SUCCESS [01:02 min]
[INFO] Spark Project Launcher . SUCCESS [01:03 min]
[INFO] Spark Project Networking ... SUCCESS [ 30.794 s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [ 29.496 s]
[INFO] Spark Project Unsafe ... SUCCESS [ 18.478 s]
[INFO] Spark Project Core . SUCCESS [05:42 min]
[INFO] Spark Project Bagel  SUCCESS [  6.082 s]
[INFO] Spark Project GraphX ... SUCCESS [ 23.478 s]
[INFO] Spark Project Streaming  SUCCESS [ 53.969 s]
[INFO] Spark Project Catalyst . SUCCESS [02:12 min]
[INFO] Spark Project SQL .. SUCCESS [03:02 min]
[INFO] Spark Project ML Library ... SUCCESS [02:57 min]
[INFO] Spark Project Tools  SUCCESS [  3.139 s]
[INFO] Spark Project Hive . SUCCESS [03:25 min]
[INFO] Spark Project REPL . SUCCESS [ 18.303 s]
[INFO] Spark Project Assembly . SUCCESS [01:40 min]
[INFO] Spark Project External Twitter . SUCCESS [ 16.707 s]
[INFO] Spark Project External Flume Sink .. SUCCESS [ 52.234 s]
[INFO] Spark Project External Flume ... SUCCESS [ 13.069 s]
[INFO] Spark Project External Flume Assembly .. SUCCESS [  4.653 s]
[INFO] Spark Project External MQTT  SUCCESS [01:56 min]
[INFO] Spark Project External MQTT Assembly ... SUCCESS [ 15.233 s]
[INFO] Spark Project External ZeroMQ .. SUCCESS [ 13.267 s]
[INFO] Spark Project External Kafka ... SUCCESS [ 41.663 s]
[INFO] Spark Project Examples . FAILURE [07:36 min]
[INFO] Spark Project External Kafka Assembly .. SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 40:07 min
[INFO] Finished at: 2015-10-08T13:14:31+05:30
[INFO] Final Memory: 373M/1205M
[INFO] 
[ERROR] Failed to execute goal on project spark-examples_2.10: Could not resolve 
dependencies for project org.apache.spark:spark-examples_2.10:jar:1.6.0-SNAPSHOT: 
The following artifacts could not be resolved: 
com.twitter:algebird-core_2.10:jar:0.9.0, com.github.stephenc:jamm:jar:0.2.5: 
Could not transfer artifact com.twitter:algebird-core_2.10:jar:0.9.0 from/to 
central (https://repo1.maven.org/maven2): GET request of: 
com/twitter/algebird-core_2.10/0.9.0/algebird-core_2.10-0.9.0.jar from central 
failed: Connection reset -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn  -rf :spark-examples_2.10
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark ganglia jClassNotFoundException: org.apache.spark.metrics.sink.GangliaSink

2015-10-08 Thread gtanguy
I build spark with ganglia :

$SPARK_HOME/build/sbt -Pspark-ganglia-lgpl -Phadoop-1 -Phive
-Phive-thriftserver assembly
...
[info] Including from cache: metrics-ganglia-3.1.0.jar
...

In the master log :

ERROR actor.OneForOneStrategy: org.apache.spark.metrics.sink.GangliaSink 
akka.actor.ActorInitializationException: exception during creation
   at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
   at akka.actor.ActorCell.create(ActorCell.scala:596)
   at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
   at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
   at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
   at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.metrics.sink.GangliaSink 


Did I forget something?

I am on spark 1.3.1.

My metrics.properties :

*.sink.ganglia.class=org.apache.spark.metrics.sink.GangliaSink 
*.sink.ganglia.host=ip-10-137-120-185.ec2.internal
*.sink.ganglia.port=5080
*.sink.ganglia.period=10 
*.sink.ganglia.unit=seconds 
*.sink.ganglia.ttl=1 
*.sink.ganglia.mode=multicast 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ganglia-jClassNotFoundException-org-apache-spark-metrics-sink-GangliaSink-tp24977.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.5.1 standalone cluster - wrong Akka remoting config?

2015-10-08 Thread Barak Yaish
Doing my firsts steps with Spark, I'm facing problems submitting jobs to
cluster from the application code. Digging the logs, I noticed some
periodic WARN messages on master log:

15/10/08 13:00:00 WARN remote.ReliableDeliverySupervisor: Association with
remote system [akka.tcp://sparkDriver@192.168.254.167:64014] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]

The problem is that ip address not exist on our network, and wasn't
configured anywhere. The same wrong ip is shown on the worker log when it
tries execute the task (wrong ip passed to --driver-url):

15/10/08 12:58:21 INFO worker.ExecutorRunner: Launch command:
"/usr/java/latest//bin/java" "-cp" "/path/spark/spark-1.5.1-bin-ha
doop2.6/sbin/../conf/:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar:/path/spark/
spark-1.5.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.ja
r:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/path/hadoop/2.6.0//etc/hadoop/"
"-Xms102
4M" "-Xmx1024M" "-Dspark.driver.port=64014" "-Dspark.driver.port=53411"
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
"akka.tcp://sparkDriver@192.168.254.167:64014/user/CoarseGrainedScheduler"
"--executor-id" "39" "--hostname" "192.168.10.214" "--cores" "16"
"--app-id"  "app-20151008123702-0003" "--worker-url" "akka.tcp://
sparkWorker@192.168.10.214:37625/user/Worker"
15/10/08 12:59:28 INFO worker.Worker: Executor app-20151008123702-0003/39
finished with state EXITED message Command exited with code 1 exitStatus 1
Any idea what I did wrong and how can this be fixed?

The java version is 1.8.0_20, and I'm using pre-built Spark binaries.

Thanks!


Re: Example of updateStateByKey with initial RDD?

2015-10-08 Thread Aniket Bhatnagar
Ohh I see. You could have to add underscore
after ProbabilityCalculator.updateCountsOfProcessGivenRole. Try:

dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole _,
new HashPartitioner(3), initialProcessGivenRoleRdd)

Here is an example:
def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long] =
{
  val prevCount = prevStateOpt.getOrElse(0L)
  val newCount = prevCount + events.size
  Some(newCount)
}
val interval = 60 * 1000
val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_ *
interval).map(n => (n % interval, n / interval))
val counts = eventsStream.map(event => {
  (event.timestamp - event.timestamp % interval, event)
}).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new
HashPartitioner(3), initialRDD = initialRDD)
counts.print()

Thanks,
Aniket


On Thu, Oct 8, 2015 at 5:48 PM Bryan Jeffrey 
wrote:

> Aniket,
>
> Thank you for the example - but that's not quite what I'm looking for.
> I've got a call to updateStateByKey that looks like the following:
>
> dstream.map(x => (x.keyWithTime, x))
> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole)
>
> def updateCountsOfProcessGivenRole(a : Seq[CountsOfProcessGivenRole], b:
> Option[CountsOfProcessGivenRole]) : Option[CountsOfProcessGivenRole] = {
> val currentTime = DateTime.now(DateTimeZone.UTC)
> if(a.isEmpty) {
>   if (b.get.eventhourbin.plusDays(3).getMillis <
> currentTime.getMillis) {
> None
>   } else {
> b
>   }
> } else { // a is not empty - b may or may not be defined
>   val population = if(b.isDefined) b.get.Population else 0 + a.map(x
> => x.Population).sum
>   val subpopulation = if(b.isDefined) b.get.Subpopulation else 0 +
> a.map(x => x.Subpopulation).sum
>   Some(CountsOfProcessGivenRole(a(0), population, subpopulation))
> }
>   }
>
> This works fine, however when I go to add an initial RDD, modifying the
> 'updateStateByKey' call to look like the following:
>
> val initialProcessGivenRoleRdd: RDD[((String, String, DateTime),
> CountsOfProcessGivenRole)] = iPrProcessGivenRole.map(x => (x.keyWithTime,
> x))
> dstream.map(x => (x.keyWithTime, x))
> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole,
> new HashPartitioner(3), initialProcessGivenRoleRdd)
>
> I am getting an error -- 'missing arguments for method
> updateCountsOfProcessGivenRole'. Looking at the method calls, the function
> that is called for appears to be the same.  I was hoping an example might
> shed some light on the issue.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
>
>
>
>
> On Thu, Oct 8, 2015 at 7:04 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> Here is an example:
>>
>> val interval = 60 * 1000
>> val counts = eventsStream.map(event => {
>>   (event.timestamp - event.timestamp % interval, event)
>> }).updateStateByKey[Long](updateFunc = (events: Seq[Event], prevStateOpt:
>> Option[Long]) => {
>>   val prevCount = prevStateOpt.getOrElse(0L)
>>   val newCount = prevCount + events.size
>>   Some(newCount)
>> })
>> counts.print()
>>
>> Hope it helps!
>>
>> Thanks,
>> Aniket
>>
>> On Thu, Oct 8, 2015 at 4:29 PM Bryan  wrote:
>>
>>> Hello,
>>>
>>> Can anyone point me to a good example of updateStateByKey with an
>>> initial RDD? I am seeing a compile time error when following the API.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>
>


How can I read file from HDFS i sparkR from RStudio

2015-10-08 Thread Amit Behera
Hi All,

I am very new to SparkR.

I am able to run a sample code from example given in the link :
http://www.r-bloggers.com/installing-and-starting-sparkr-locally-on-windows-os-and-rstudio/

Then I am trying to read a file from HDFS in RStudio, but unable to read.
Below is my code.












*Sys.setenv(SPARK_HOME="/home/affine/spark")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))
library(SparkR) library(rJava)
Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages"
"com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"') sc <-
sparkR.init(master = "spark://master:7077",sparkPackages =
"com.databricks:spark-csv_2.1:1.2.0") sqlContext <- sparkRSQL.init(sc) *

*peopleDF <- read.df(sqlContext, "hdfs://master:9000/sears/example.csv")*

*Error:*

Error in callJMethod(sqlContext, "getConf", "spark.sql.sources.default",  :
  Invalid jobj 1. If SparkR was restarted, Spark operations need to be
re-executed.

Please tell me where I am going wrong.

Thanks,
Amit.


Re: Is coalesce smart while merging partitions?

2015-10-08 Thread Iulian Dragoș
It's smart. Have a look at
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L123

On Thu, Oct 8, 2015 at 4:00 AM, Cesar Flores  wrote:

> It is my understanding that the default behavior of coalesce function when
> the user reduce the number of partitions is to only merge them without
> executing shuffle.
>
> My question is: Is this merging smart? For example does spark try to merge
> the small partitions first or the election of partitions to merge is random?
>
>
> Thanks
> --
> Cesar Flores
>



-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Example of updateStateByKey with initial RDD?

2015-10-08 Thread Bryan Jeffrey
Aniket,

Thank you for the example - but that's not quite what I'm looking for.
I've got a call to updateStateByKey that looks like the following:

dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole)

def updateCountsOfProcessGivenRole(a : Seq[CountsOfProcessGivenRole], b:
Option[CountsOfProcessGivenRole]) : Option[CountsOfProcessGivenRole] = {
val currentTime = DateTime.now(DateTimeZone.UTC)
if(a.isEmpty) {
  if (b.get.eventhourbin.plusDays(3).getMillis < currentTime.getMillis)
{
None
  } else {
b
  }
} else { // a is not empty - b may or may not be defined
  val population = if(b.isDefined) b.get.Population else 0 + a.map(x =>
x.Population).sum
  val subpopulation = if(b.isDefined) b.get.Subpopulation else 0 +
a.map(x => x.Subpopulation).sum
  Some(CountsOfProcessGivenRole(a(0), population, subpopulation))
}
  }

This works fine, however when I go to add an initial RDD, modifying the
'updateStateByKey' call to look like the following:

val initialProcessGivenRoleRdd: RDD[((String, String, DateTime),
CountsOfProcessGivenRole)] = iPrProcessGivenRole.map(x => (x.keyWithTime,
x))
dstream.map(x => (x.keyWithTime, x))
.updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole, new
HashPartitioner(3), initialProcessGivenRoleRdd)

I am getting an error -- 'missing arguments for method
updateCountsOfProcessGivenRole'. Looking at the method calls, the function
that is called for appears to be the same.  I was hoping an example might
shed some light on the issue.

Regards,

Bryan Jeffrey







On Thu, Oct 8, 2015 at 7:04 AM, Aniket Bhatnagar  wrote:

> Here is an example:
>
> val interval = 60 * 1000
> val counts = eventsStream.map(event => {
>   (event.timestamp - event.timestamp % interval, event)
> }).updateStateByKey[Long](updateFunc = (events: Seq[Event], prevStateOpt:
> Option[Long]) => {
>   val prevCount = prevStateOpt.getOrElse(0L)
>   val newCount = prevCount + events.size
>   Some(newCount)
> })
> counts.print()
>
> Hope it helps!
>
> Thanks,
> Aniket
>
> On Thu, Oct 8, 2015 at 4:29 PM Bryan  wrote:
>
>> Hello,
>>
>> Can anyone point me to a good example of updateStateByKey with an initial
>> RDD? I am seeing a compile time error when following the API.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>


Spark 1.5.1 standalone cluster - wrong Akka remoting config?

2015-10-08 Thread baraky
Doing my firsts steps with Spark, I'm facing problems submitting jobs to
cluster from the application code. Digging the logs, I noticed some periodic
WARN messages on master log:

15/10/08 13:00:00 WARN remote.ReliableDeliverySupervisor: Association with
remote system [akka.tcp://sparkDriver@192.168.254.167:64014] has failed,
address is now gated for [5000] ms. Reason: [Disassociated]

The problem is that ip address not exist on our network, and wasn't
configured anywhere. The same wrong ip is shown on the worker log when it
tries execute the task (wrong ip passed to --driver-url):

15/10/08 12:58:21 INFO worker.ExecutorRunner: Launch command:
"/usr/java/latest//bin/java" "-cp" "/path/spark/spark-1.5.1-bin-ha
doop2.6/sbin/../conf/:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar:/path/spark/
spark-1.5.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.ja
r:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/path/hadoop/2.6.0//etc/hadoop/"
"-Xms102
4M" "-Xmx1024M" "-Dspark.driver.port=64014" "-Dspark.driver.port=53411"
"org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
"akka.tcp://sparkDriver@192.168.254.167:64014/user/CoarseGrainedScheduler"
"--executor-id" "39" "--hostname" "192.168.10.214" "--cores" "16" "--app-id" 
"app-20151008123702-0003" "--worker-url"
"akka.tcp://sparkWorker@192.168.10.214:37625/user/Worker"
15/10/08 12:59:28 INFO worker.Worker: Executor app-20151008123702-0003/39
finished with state EXITED message Command exited with code 1 exitStatus 1
Any idea what I did wrong and how can this be fixed?

The java version is 1.8.0_20, and I'm using pre-built Spark binaries.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-standalone-cluster-wrong-Akka-remoting-config-tp24978.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: Doing operation in Receiver vs RDD

2015-10-08 Thread Iulian Dragoș
You can have a look at
http://spark.apache.org/docs/latest/streaming-programming-guide.html#receiver-reliability
for details on Receiver reliability. If you go the receiver way you'll need
to enable Write Ahead Logs to ensure no data loss. In Kafka direct you
don't have this problem.

Regarding where to apply decryption, I'd lean towards doing it as RDD
transformations for the reasons you mentioned. Also, in case only some
fields are encrypted, this way you can delay decryption until really need
(assuming some records would be filtered out, etc.).

iulian

On Wed, Oct 7, 2015 at 9:55 PM, emiretsk  wrote:

> Hi,
>
> I have a Spark Streaming program that is consuming message from Kafka and
> has to decrypt and deserialize each message. I can implement it either as
> Kafka deserializer (that will run in a receiver or the new receiver-less
> Kafka consumer)  or as RDD operations. What are the pros/cons of each?
>
> As I see it, doing the operations on RDDs has the following implications
> Better load balancing, and fault tolerance. (though I'm not quite sure what
> happens when a receiver fails). Also, not sure if this is still true with
> the new Kafka receiver-less consumer as it creates an RDD partition for
> each
> Kafka partition
> All functions that are applied to RDDs need to be either static or part of
> serialzable objects. This makes using standard/3rd party Java libraries
> harder.
> Cheers,
> Eugene
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Doing-operation-in-Receiver-vs-RDD-tp24973.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Optimal way to avoid processing null returns in Spark Scala

2015-10-08 Thread Iulian Dragoș
On Wed, Oct 7, 2015 at 6:42 PM, swetha  wrote:

Hi,
>
> I have the following functions that I am using for my job in Scala. If you
> see the getSessionId function I am returning null sometimes. If I return
> null the only way that I can avoid processing those records is by filtering
> out null records. I wanted to avoid having another pass for filtering so I
> tried returning "None" . But, it seems to be having issues as it demands
> the
> return type as optional. What is the optimal way to avoid processing null
> records and at the same way avoid having Option as the return type using
> None? The use of Option[] and Some(()) seems to be having type issues in
> subsequent function calls.
>
You should use RDD.flatMap, this way you can map and filter at the same
time. Something like

rdd.flatMap { case (x, y) =>
  val sessionid = getSessionId(y)
  if (sessionId != null)
  Seq(((sessionId, (getTimeStamp(y).toLong,y
  else
  Seq()
}

I didn’t try to compile that method, but you’ll figure out the types, if
need be.

iulian


>
> val sessions = filteredStream.transform(rdd=>getBeaconMap(rdd))
>
>   def getBeaconMap(rdd: RDD[(String, String)]): RDD[(String, (Long,
> String))] = {
> rdd.map[(String, (Long, String))]{ case (x, y) =>
>   ((getSessionId(y), (getTimeStamp(y).toLong,y)))
> }
>   }
>
>   def getSessionId(eventRecord:String): String = {
> val beaconTestImpl: BeaconTestLoader = new BeaconTestImpl//This needs
> to
> be changed.
> val beaconEvent: BeaconEventData =
> beaconTestImpl.getBeaconEventData(eventRecord)
>
> if(beaconEvent!=null){
>beaconEvent.getSessionID //This might be in Set Cookie header
> }else{
>  null
> }
>
>
> val groupedAndSortedSessions =
> sessions.transform(rdd=>ExpoJobCommonNew.getGroupedAndSortedSessions(rdd))
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Optimal-way-to-avoid-processing-null-returns-in-Spark-Scala-tp24972.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
> ​
-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Best practises to clean up RDDs for old applications

2015-10-08 Thread Jens Rantil
Hi,

I have a couple of old application RDDs under /var/lib/spark/rdd that
haven't been properly cleaned up after themselves. Example:

# du -shx /var/lib/spark/rdd/*
44K /var/lib/spark/rdd/liblz4-java1011984124691611873.so
48K /var/lib/spark/rdd/snappy-1.0.5-libsnappyjava.so
2.3G /var/lib/spark/rdd/spark-local-20150903112858-a72d
23M /var/lib/spark/rdd/spark-local-20150929141201-143f

The applications (such as "20150903112858") aren't running anymore. What
are best practises to clean these up? A cron job? Enabling some kind of
cleaner in Spark? I'm currently running Spark 1.1, but will eventually move
to 1.2 and then 1.4.

Thanks,
Jens

-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


Re: Spark 1.5.1 standalone cluster - wrong Akka remoting config?

2015-10-08 Thread michal.klo...@gmail.com
Try setting spark.driver.host to the actual ip or hostname of the box 
submitting the work. More info the networking section in this link:

http://spark.apache.org/docs/latest/configuration.html

Also check the spark config for your application for these driver settings in 
the application web UI at http://:4040  in the “Environment” tab. More 
info in the "viewing configuration properties" section in that link.

M



> On Oct 8, 2015, at 7:04 AM, baraky  wrote:
> 
> Doing my firsts steps with Spark, I'm facing problems submitting jobs to
> cluster from the application code. Digging the logs, I noticed some periodic
> WARN messages on master log:
> 
> 15/10/08 13:00:00 WARN remote.ReliableDeliverySupervisor: Association with
> remote system [akka.tcp://sparkDriver@192.168.254.167:64014] has failed,
> address is now gated for [5000] ms. Reason: [Disassociated]
> 
> The problem is that ip address not exist on our network, and wasn't
> configured anywhere. The same wrong ip is shown on the worker log when it
> tries execute the task (wrong ip passed to --driver-url):
> 
> 15/10/08 12:58:21 INFO worker.ExecutorRunner: Launch command:
> "/usr/java/latest//bin/java" "-cp" "/path/spark/spark-1.5.1-bin-ha
> doop2.6/sbin/../conf/:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/spark-assembly-1.5.1-hadoop2.6.0.jar:/path/spark/
> spark-1.5.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.ja
> r:/path/spark/spark-1.5.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/path/hadoop/2.6.0//etc/hadoop/"
> "-Xms102
> 4M" "-Xmx1024M" "-Dspark.driver.port=64014" "-Dspark.driver.port=53411"
> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url"
> "akka.tcp://sparkDriver@192.168.254.167:64014/user/CoarseGrainedScheduler"
> "--executor-id" "39" "--hostname" "192.168.10.214" "--cores" "16" "--app-id" 
> "app-20151008123702-0003" "--worker-url"
> "akka.tcp://sparkWorker@192.168.10.214:37625/user/Worker"
> 15/10/08 12:59:28 INFO worker.Worker: Executor app-20151008123702-0003/39
> finished with state EXITED message Command exited with code 1 exitStatus 1
> Any idea what I did wrong and how can this be fixed?
> 
> The java version is 1.8.0_20, and I'm using pre-built Spark binaries.
> 
> Thanks!
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-standalone-cluster-wrong-Akka-remoting-config-tp24978.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


Launching EC2 instances with Spark compiled for Scala 2.11

2015-10-08 Thread Theodore Vasiloudis
Hello,

I was wondering if there is an easy way launch EC2 instances which have a
Spark built for Scala 2.11.

The only way I can think of is to prepare the sources for 2.11 as shown in
the Spark build instructions (
http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211),
upload the changed sources as a Github repo, and use the "--spark-git-repo"
option to specify the repo as the one to build from.

Is there a recommended way to launch EC2 instances if you need Scala 2.11
support?

Regards,
Theodore




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Launching-EC2-instances-with-Spark-compiled-for-Scala-2-11-tp24979.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Example of updateStateByKey with initial RDD?

2015-10-08 Thread Bryan Jeffrey
Honestly, that's what I already did - I am working to test it now.  It
looked like 'add an underscore' was ignoring some implicit argument that I
was failing to provide.

On Thu, Oct 8, 2015 at 8:34 AM, Aniket Bhatnagar  wrote:

> Ohh I see. You could have to add underscore
> after ProbabilityCalculator.updateCountsOfProcessGivenRole. Try:
>
> dstream.map(x => (x.keyWithTime, x))
> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole _,
> new HashPartitioner(3), initialProcessGivenRoleRdd)
>
> Here is an example:
> def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long]
> = {
>   val prevCount = prevStateOpt.getOrElse(0L)
>   val newCount = prevCount + events.size
>   Some(newCount)
> }
> val interval = 60 * 1000
> val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_ *
> interval).map(n => (n % interval, n / interval))
> val counts = eventsStream.map(event => {
>   (event.timestamp - event.timestamp % interval, event)
> }).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new
> HashPartitioner(3), initialRDD = initialRDD)
> counts.print()
>
> Thanks,
> Aniket
>
>
> On Thu, Oct 8, 2015 at 5:48 PM Bryan Jeffrey 
> wrote:
>
>> Aniket,
>>
>> Thank you for the example - but that's not quite what I'm looking for.
>> I've got a call to updateStateByKey that looks like the following:
>>
>> dstream.map(x => (x.keyWithTime, x))
>> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole)
>>
>> def updateCountsOfProcessGivenRole(a : Seq[CountsOfProcessGivenRole], b:
>> Option[CountsOfProcessGivenRole]) : Option[CountsOfProcessGivenRole] = {
>> val currentTime = DateTime.now(DateTimeZone.UTC)
>> if(a.isEmpty) {
>>   if (b.get.eventhourbin.plusDays(3).getMillis <
>> currentTime.getMillis) {
>> None
>>   } else {
>> b
>>   }
>> } else { // a is not empty - b may or may not be defined
>>   val population = if(b.isDefined) b.get.Population else 0 + a.map(x
>> => x.Population).sum
>>   val subpopulation = if(b.isDefined) b.get.Subpopulation else 0 +
>> a.map(x => x.Subpopulation).sum
>>   Some(CountsOfProcessGivenRole(a(0), population, subpopulation))
>> }
>>   }
>>
>> This works fine, however when I go to add an initial RDD, modifying the
>> 'updateStateByKey' call to look like the following:
>>
>> val initialProcessGivenRoleRdd: RDD[((String, String, DateTime),
>> CountsOfProcessGivenRole)] = iPrProcessGivenRole.map(x => (x.keyWithTime,
>> x))
>> dstream.map(x => (x.keyWithTime, x))
>> .updateStateByKey(ProbabilityCalculator.updateCountsOfProcessGivenRole,
>> new HashPartitioner(3), initialProcessGivenRoleRdd)
>>
>> I am getting an error -- 'missing arguments for method
>> updateCountsOfProcessGivenRole'. Looking at the method calls, the function
>> that is called for appears to be the same.  I was hoping an example might
>> shed some light on the issue.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Oct 8, 2015 at 7:04 AM, Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Here is an example:
>>>
>>> val interval = 60 * 1000
>>> val counts = eventsStream.map(event => {
>>>   (event.timestamp - event.timestamp % interval, event)
>>> }).updateStateByKey[Long](updateFunc = (events: Seq[Event],
>>> prevStateOpt: Option[Long]) => {
>>>   val prevCount = prevStateOpt.getOrElse(0L)
>>>   val newCount = prevCount + events.size
>>>   Some(newCount)
>>> })
>>> counts.print()
>>>
>>> Hope it helps!
>>>
>>> Thanks,
>>> Aniket
>>>
>>> On Thu, Oct 8, 2015 at 4:29 PM Bryan  wrote:
>>>
 Hello,

 Can anyone point me to a good example of updateStateByKey with an
 initial RDD? I am seeing a compile time error when following the API.

 Regards,

 Bryan Jeffrey

>>>
>>


RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif



Using a variable (a column name) in an IF statement in Spark SQL

2015-10-08 Thread Maheshakya Wijewardena
Hi,

Suppose there is data frame called goods with columns "barcode" and
"items". Some of the values in the column "items" can be null.

I want to the barcode and the respective items from the table adhering the
following rules:

   - If "items" is null -> output 0
   - else -> output "items" ( the actual value in the column)

I would write a query like:

*SELECT barcode, IF(items is null, 0, items) FROM goods*

But this query fails with the error:

*unresolved operator 'Project [if (IS NULL items#1) 0 else items#1 AS
c0#132]; *

It seems I can only use numerical values inside this IF statement, but when
a column name is used, it fails.

Is there any workaround to do this?

Best regards.
-- 
Pruthuvi Maheshakya Wijewardena
Software Engineer
WSO2 : http://wso2.com/
Email: mahesha...@wso2.com
Mobile: +94711228855


Re: Is coalesce smart while merging partitions?

2015-10-08 Thread Daniel Darabos
> For example does spark try to merge the small partitions first or the
election of partitions to merge is random?

It is quite smart as Iulian has pointed out. But it does not try to merge
small partitions first. Spark doesn't know the size of partitions. (The
partitions are represented as Iterators. You cannot know its size without
destroying it.)


Re: does KafkaCluster can be public ?

2015-10-08 Thread Cody Koeninger
If anyone is interested in keeping tabs on it, the jira for this is

https://issues.apache.org/jira/browse/SPARK-10963

On Wed, Oct 7, 2015 at 3:16 AM, Erwan ALLAIN 
wrote:

> Thanks guys !
>
> On Wed, Oct 7, 2015 at 1:41 AM, Cody Koeninger  wrote:
>
>> Sure no prob.
>>
>> On Tue, Oct 6, 2015 at 6:35 PM, Tathagata Das 
>> wrote:
>>
>>> Given the interest, I am also inclining towards making it a public
>>> developer API. Maybe even experimental. Cody, mind submitting a patch?
>>>
>>>
>>> On Tue, Oct 6, 2015 at 7:45 AM, Sean Owen  wrote:
>>>
 For what it's worth, I also use this class in an app, but it happens
 to be from Java code where it acts as if it's public. So no problem
 for my use case, but I suppose, another small vote for the usefulness
 of this class to the caller. I end up using getLatestLeaderOffsets to
 figure out how to initialize initial offsets.

 On Tue, Oct 6, 2015 at 3:24 PM, Cody Koeninger 
 wrote:
 > I personally think KafkaCluster (or the equivalent) should be made
 public.
 > When I'm deploying spark I just sed out the private[spark] and
 rebuild.
 >
 > There's a general reluctance to make things public due to backwards
 > compatibility, but if enough people ask for it... ?
 >
 > On Tue, Oct 6, 2015 at 6:51 AM, Jonathan Coveney 
 wrote:
 >>
 >> You can put a class in the org.apache.spark namespace to access
 anything
 >> that is private[spark]. You can then make enrichments there to access
 >> whatever you need. Just beware upgrade pain :)
 >>
 >>
 >> El martes, 6 de octubre de 2015, Erwan ALLAIN <
 eallain.po...@gmail.com>
 >> escribió:
 >>>
 >>> Hello,
 >>>
 >>> I'm currently testing spark streaming with kafka.
 >>> I'm creating DirectStream with KafkaUtils and everything's fine.
 However
 >>> I would like to use the signature where I can specify my own
 message handler
 >>> (to play with partition and offset). In this case, I need to manage
 >>> offset/partition by myself to fill fromOffsets argument.
 >>> I have found a Jira on this usecase
 >>> https://issues.apache.org/jira/browse/SPARK-6714 but it has been
 closed
 >>> telling that it's too specific.
 >>> I'm aware that it can be done using kafka api (TopicMetaDataRequest
 and
 >>> OffsetRequest) but what I have to do is almost the same as the
 KafkaCluster
 >>> which is private.
 >>>
 >>> is it possible to :
 >>>  - add another signature in KafkaUtils ?
 >>>  - make KafkaCluster public ?
 >>>
 >>> or do you have any other srmart solution where I don't need to
 copy/paste
 >>> KafkaCluster ?
 >>>
 >>> Thanks.
 >>>
 >>> Regards,
 >>> Erwan ALLAIN
 >
 >

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: Launching EC2 instances with Spark compiled for Scala 2.11

2015-10-08 Thread Aniket Bhatnagar
Is it possible for you to use EMR instead of EC2? If so, you may be able to
tweak EMR bootstrap scripts to install your custom spark build.

Thanks,
Aniket

On Thu, Oct 8, 2015 at 5:58 PM Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Hello,
>
> I was wondering if there is an easy way launch EC2 instances which have a
> Spark built for Scala 2.11.
>
> The only way I can think of is to prepare the sources for 2.11 as shown in
> the Spark build instructions (
> http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211),
> upload the changed sources as a Github repo, and use the "--spark-git-repo"
> option to specify the repo as the one to build from.
>
> Is there a recommended way to launch EC2 instances if you need Scala 2.11
> support?
>
> Regards,
> Theodore
>
> --
> View this message in context: Launching EC2 instances with Spark compiled
> for Scala 2.11
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Dataframes - sole data structure for parallel computations?

2015-10-08 Thread Tracewski, Lukasz
Hi,

Many people interpret this slide from Databricks
https://ogirardot.files.wordpress.com/2015/05/future-of-spark.png
as indication that Dataframes API is going to be the main processing unit of 
Spark and sole access point to MLlib, Streaming and such. Is it true? My 
impression was that Dataframes are an additional abstraction layer with some 
promising optimisation coming from Tungsten project, but that's all. RDDs are 
there to stay. They are a natural selection when it comes to e.g. processing 
images.

Here is one article that advertises Dataframes as a "sole data structure for 
parallel computations":
https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/
 (paragraph 4)

Cheers,
Lucas




=== 
Please access the attached hyperlink for an important electronic communications 
disclaimer: 
http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
=== 


JDBC thrift server

2015-10-08 Thread Younes Naguib
Hi,

We've been using the JDBC thrift server for a couple of weeks now and running 
queries on it like a regular RDBMS.
We're about to deploy it in a shared production cluster.

Any advice, warning on a such setup. Yarn or Mesos?
How about dynamic resource allocation in a already running thrift server?

Thanks,
Younes



RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-08 Thread Sun, Rui
Can you extract the spark-submit command from the console output, and run it on 
the Shell, and see if there is any error message?

From: Khandeshi, Ami [mailto:ami.khande...@fmr.com]
Sent: Wednesday, October 7, 2015 9:57 PM
To: Sun, Rui; Hossein
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Tried, multiple permutation of setting home… Still same issue
> Sys.setenv(SPARK_HOME="c:\\DevTools\\spark-1.5.1")
> .libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))
> library(SparkR)

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

filter, na.omit

The following objects are masked from ‘package:base’:

intersect, rbind, sample, subset, summary, table, transform

> sc<-sparkR.init(master = "local")
Launching java with spark-submit command 
c:\DevTools\spark-1.5.1/bin/spark-submit.cmd   sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\RtmpkXZVBa\backend_port45ac487f2fbd
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds


From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, October 07, 2015 2:35 AM
To: Hossein; Khandeshi, Ami
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Not sure "/C/DevTools/spark-1.5.1/bin/spark-submit.cmd" is a valid?

From: Hossein [mailto:fal...@gmail.com]
Sent: Wednesday, October 7, 2015 12:46 AM
To: Khandeshi, Ami
Cc: Sun, Rui; akhandeshi; user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
> wrote:
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> sc <- sparkR.init(master="local")
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com]
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()?

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com]
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


--
--Hossein


Re: Parquet file size

2015-10-08 Thread Cheng Lian
How many tasks are there in the write job? Since each task may write one 
file for each partition, you may end up with taskNum * 31 files.


Increasing SPLIT_MINSIZE does help reducing task number. Another way to 
address this issue is to use DataFrame.coalesce(n) to shrink task number 
to n explicitly.


Cheng

On 10/7/15 6:40 PM, Younes Naguib wrote:

Thanks, I'll try that.

*Younes Naguib**Streaming Division*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC  H3G 
1R8**


Tel.: +1 514 448 4037 x2688 | Tel.: +1 866 448 4037 x2688 | 
younes.nag...@tritondigital.com**



*From:* odeach...@gmail.com [odeach...@gmail.com] on behalf of Deng 
Ching-Mallete [och...@apache.org]

*Sent:* Wednesday, October 07, 2015 9:14 PM
*To:* Younes Naguib
*Cc:* Cheng Lian; user@spark.apache.org
*Subject:* Re: Parquet file size

Hi,

In our case, we're using 
the org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MINSIZE to 
increase the size of the RDD partitions when loading text files, so it 
would generate larger parquet files. We just set it in the Hadoop conf 
of the SparkContext. You need to be careful though about setting it to 
a large value, as you might encounter issues related to this:

https://issues.apache.org/jira/browse/SPARK-6235

For our jobs, we're setting the split size to 512MB which generates 
between 110-200MB parquet files using the default compression. We're 
using Spark-1.3.1, btw, and we also have the same partitioning of 
year/month/day for our parquet files.


HTH,
Deng

On Thu, Oct 8, 2015 at 8:25 AM, Younes Naguib 
> wrote:


Well, I only have data for 2015-08. So, in the end, only 31
partitions
What I'm looking for, is some reasonably sized partitions.
In any case, just the idea of controlling the output parquet files
size or number would be nice.

*Younes Naguib**Streaming Division*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 | Montreal, QC 
H3G 1R8**


Tel.: +1 514 448 4037 x2688 
| Tel.: +1 866 448 4037 x2688
 |
younes.nag...@tritondigital.com**


*From:* Cheng Lian [lian.cs@gmail.com
]
*Sent:* Wednesday, October 07, 2015 7:01 PM

*To:* Younes Naguib; 'user@spark.apache.org
'
*Subject:* Re: Parquet file size

The reason why so many small files are generated should probably
be the fact that you are inserting into a partitioned table with
three partition columns.

If you want a large Parquet files, you may try to either avoid
using partitioned table, or using less partition columns (e.g.,
only year, without month and day).

Cheng

So you want to dump all data into a single large Parquet file?

On 10/7/15 1:55 PM, Younes Naguib wrote:


The TSV original files is 600GB and generated 40k files of 15-25MB.

y

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* October-07-15 3:18 PM
*To:* Younes Naguib; 'user@spark.apache.org
'
*Subject:* Re: Parquet file size

Why do you want larger files? Doesn't the result Parquet file
contain all the data in the original TSV file?

Cheng

On 10/7/15 11:07 AM, Younes Naguib wrote:

Hi,

I’m reading a large tsv file, and creating parquet files
using sparksql:

insert overwrite

table tbl partition(year, month, day)

Select  from tbl_tsv;

This works nicely, but generates small parquet files (15MB).

I wanted to generate larger files, any idea how to address this?

*Thanks,*

*Younes Naguib*

Triton Digital | 1440 Ste-Catherine W., Suite 1200 |
Montreal, QC  H3G 1R8

Tel.: +1 514 448 4037 x2688
 | Tel.: +1 866 448 4037
x2688  |
younes.nag...@tritondigital.com








How to register udf with Any or generic Type in spark

2015-10-08 Thread dugasani jcreddy
Hi,
  I have a requirement  to use udf whose return type is not known in spark data 
frame sql. I have below requirementfunction takes either String  Or Boolean  
data types.function returns 1 or 0 based on whether Input argument is True or 
False.
If input string is in the form Integer or long return Int or long typesIf input 
String is in the form of Double return Double.
I could create function with Any return type  but I am not able register with 
sqlContext.udf.register statement.I tried Generic type but  could not register.
Kindly   advise me
ThanksJayachandra


Re: sql query orc slow

2015-10-08 Thread Zhan Zhang
Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee  wrote:

> Hi,
> 
> I am using spark sql 1.5 to query a hive table stored as partitioned orc 
> file. We have the total files is about 6000 files and each file size is about 
> 245MB.
> 
> What is the difference between these two query methods below:
> 
> 1. Using query on hive table directly
> 
> hiveContext.sql("select col1, col2 from table1")
> 
> 2. Reading from orc file, register temp table and query from the temp table
> 
> val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
> c.registerTempTable("regTable")
> hiveContext.sql("select col1, col2 from regTable")
> 
> When the number of files is large (query all from the total 6000 files) , the 
> second case is much slower then the first one. Any ideas why?
> 
> BR,
> 
> 
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error executing using alternating least square

2015-10-08 Thread haridass saisriram
Hi,

   I downloaded spark 1.5.0 on windows 7 and built it using

 build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package


and tried running the Alternating least square example (
http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
) using spark-shell

import org.apache.spark.mllib.recommendation.ALSimport
org.apache.spark.mllib.recommendation.MatrixFactorizationModelimport
org.apache.spark.mllib.recommendation.Ratingval data =
sc.textFile("C://cygwin64//home//test.txt")val ratings =
data.map(_.split(',') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
  })val rank = 10val numIterations = 10val model = ALS.train(ratings,
rank, numIterations, 0.01)

after ALS.train I get the following error


5/10/08 11:01:08 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 3)
java.lang.AssertionError: assertion failed:
Current position 1413 do not equal to expected position 707
after transferTo, please check your kernel version to see if it is 2.6.32,
this is a kernel bug which will lead to unexpected behavior when using
transferTo.
You can set spark.file.transferTo = false to disable this NIO feature.

at scala.Predef$.assert(Predef.scala:179)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:273)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:252)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:252)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:292)
at org.apache.spark.util.Utils.copyStream(Utils.scala)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedFile(BypassMergeSortShuffleWriter.java:149)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1280)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
at scala.Option.foreach(Option.scala:236)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:550)
at org.apache.spark.mllib.recommendation.ALS.run(ALS.scala:239)
at org.apache.spark.mllib.recommendation.ALS$.train(ALS.scala:328)
at org.apache.spark.mllib.recommendation.ALS$.train(ALS.scala:346)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at $iwC$$iwC$$iwC$$iwC.(:43)
at $iwC$$iwC$$iwC.(:45)
at $iwC$$iwC.(:47)
at $iwC.(:49)
at (:51)
at .(:55)
at .()
at .(:7)
  

Re: JDBC thrift server

2015-10-08 Thread Sathish Kumaran Vairavelu
Which version of spark you are using? You might encounter SPARK-6882
 if Kerberos is enabled.

-Sathish

On Thu, Oct 8, 2015 at 10:46 AM Younes Naguib <
younes.nag...@tritondigital.com> wrote:

> Hi,
>
>
>
> We’ve been using the JDBC thrift server for a couple of weeks now and
> running queries on it like a regular RDBMS.
>
> We’re about to deploy it in a shared production cluster.
>
>
>
> Any advice, warning on a such setup. Yarn or Mesos?
>
> How about dynamic resource allocation in a already running thrift server?
>
>
>
> *Thanks,*
>
> *Younes*
>
>
>


Re: Using Sqark SQL mapping over an RDD

2015-10-08 Thread Michael Armbrust
You can't do nested operations on RDDs or DataFrames (i.e. you can't create
a DataFrame from within a map function).  Perhaps if you explain what you
are trying to accomplish someone can suggest another way.

On Thu, Oct 8, 2015 at 10:10 AM, Afshartous, Nick 
wrote:

>
> Hi,
>
> Am using Spark, 1.5 in latest EMR 4.1.
>
> I have an RDD of String
>
>scala> deviceIds
>   res25: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at
> map at :28
>
> and then when trying to map over the RDD while attempting to run a sql
> query the result is a NullPointerException
>
>   scala> deviceIds.map(id => sqlContext.sql("select * from
> ad_info")).count()
>
> with the stack trace below.  If I run the query as a top level expression
> the count is retuned.  There was additional code within
> the anonymous function that's been removed to try and isolate.
>
> Thanks for any insights or advice on how to debug this.
> --
>   Nick
>
>
> scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
> 15/10/08 16:12:56 INFO SparkContext: Starting job: count at :40
> 15/10/08 16:12:56 INFO DAGScheduler: Got job 18 (count at :40)
> with 200 output partitions
> 15/10/08 16:12:56 INFO DAGScheduler: Final stage: ResultStage 37(count at
> :40)
> 15/10/08 16:12:56 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 36)
> 15/10/08 16:12:56 INFO DAGScheduler: Missing parents: List()
> 15/10/08 16:12:56 INFO DAGScheduler: Submitting ResultStage 37
> (MapPartitionsRDD[37] at map at :40), which has no missing parents
> 15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(17904) called with
> curMem=531894, maxMem=560993402
> 15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22 stored as values in
> memory (estimated size 17.5 KB, free 534.5 MB)
> 15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(7143) called with
> curMem=549798, maxMem=560993402
> 15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22_piece0 stored as
> bytes in memory (estimated size 7.0 KB, free 534.5 MB)
> 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in
> memory on 10.247.0.117:33555 (size: 7.0 KB, free: 535.0 MB)
> 15/10/08 16:12:56 INFO SparkContext: Created broadcast 22 from broadcast
> at DAGScheduler.scala:861
> 15/10/08 16:12:56 INFO DAGScheduler: Submitting 200 missing tasks from
> ResultStage 37 (MapPartitionsRDD[37] at map at :40)
> 15/10/08 16:12:56 INFO YarnScheduler: Adding task set 37.0 with 200 tasks
> 15/10/08 16:12:56 INFO TaskSetManager: Starting task 0.0 in stage 37.0
> (TID 649, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
> 15/10/08 16:12:56 INFO TaskSetManager: Starting task 1.0 in stage 37.0
> (TID 650, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
> 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in
> memory on ip-10-247-0-117.ec2.internal:46227 (size: 7.0 KB, free: 535.0 MB)
> 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in
> memory on ip-10-247-0-117.ec2.internal:32938 (size: 7.0 KB, free: 535.0 MB)
> 15/10/08 16:12:56 INFO TaskSetManager: Starting task 2.0 in stage 37.0
> (TID 651, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
> 15/10/08 16:12:56 WARN TaskSetManager: Lost task 0.0 in stage 37.0 (TID
> 649, ip-10-247-0-117.ec2.internal): java.lang.NullPointerException
> at
> $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:40)
> at
> $line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:40)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1555)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1121)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 15/10/08 16:12:56 INFO TaskSetManager: Starting task 0.1 in stage 37.0
> (TID 652, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
> 15/10/08 16:12:56 INFO TaskSetManager: Lost task 1.0 in stage 37.0 (TID
> 650) on executor ip-10-247-0-117.ec2.internal:
> java.lang.NullPointerException 

Re: Default size of a datatype in SparkSQL

2015-10-08 Thread Michael Armbrust
Its purely for estimation, when guessing when its safe to do a broadcast
join.  We picked a random number that we thought was larger than the common
case (its better to over estimate to avoid OOM).

On Wed, Oct 7, 2015 at 10:11 PM, vivek bhaskar  wrote:

> I want to understand whats use of default size for a given datatype?
>
> Following link mention that its for internal size estimation.
>
> https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/DataType.html
>
> Above behavior is also reflected in code where default value seems to be
> used for stats purpose only.
>
> But then we have default size of String datatype as 4096; why we went for
> this random number? Or will it also restrict size of data? Any further
> elaboration on how string datatype works will also help.
>
> Regards,
> Vivek
>
>
>


Applicative logs on Yarn

2015-10-08 Thread nibiau
Hello,
I submit spark streaming inside Yarn, I have configured yarn to generate custom 
logs.
It works fine and yarn aggregate very well the logs inside HDFS, nevertheless 
the log files are only usable via "yarn logs" command.
I would prefer to be able to navigate inside via hdfs command like a text file, 
is it possible to aggregate those files as text file ?

Tks
Nicolas 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Michael Armbrust
Which version of Spark?

On Thu, Oct 8, 2015 at 7:25 AM,  wrote:

> Hi all, would this be a bug??
>
> val ws = Window.
> partitionBy("clrty_id").
> orderBy("filemonth_dtt")
>
> val nm = "repeatMe"
> df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))
>
>
> stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))
>
> --->
>
> *Long, DateType, Int*
> [2003,2006-06-01,-1863462909]
> [2003,2006-09-01,-1863462909]
> [2003,2007-01-01,-1863462909]
> [2003,2007-08-01,-1863462909]
> [2003,2007-07-01,-1863462909]
> [2138,2007-07-01,-1863462774]
> [2138,2007-02-01,-1863462774]
> [2138,2006-11-01,-1863462774]
> [2138,2006-08-01,-1863462774]
> [2138,2007-08-01,-1863462774]
> [2138,2006-09-01,-1863462774]
> [2138,2007-03-01,-1863462774]
> [2138,2006-10-01,-1863462774]
> [2138,2007-05-01,-1863462774]
> [2138,2006-06-01,-1863462774]
> [2138,2006-12-01,-1863462774]
>
>
> Thanks,
> Saif
>
>


Re: Dataframes - sole data structure for parallel computations?

2015-10-08 Thread Jerry Lam
I just read the article by ogirardot but I don’t agree
It is like saying pandas dataframe is the sole data structure for analyzing 
data in python. Can Pandas dataframe replace Numpy array? The answer is simply 
no from an efficiency perspective for some computations. 

Unless there is a computer science breakthrough in terms of data structure 
(i.e. the data structure of everything), the statement of sole data structure 
can be treated as a joke only. Just in case, people get upset. I AM JOKING :) 

> On Oct 8, 2015, at 1:56 PM, Michael Armbrust  wrote:
> 
> Don't worry, the ability to work with domain objects and lambda functions is 
> not going to go away.  However, we are looking at ways to leverage Tungsten's 
> improved performance when processing structured data.
> 
> More details can be found here:
> https://issues.apache.org/jira/browse/SPARK- 
> 
> 
> On Thu, Oct 8, 2015 at 7:40 AM, Tracewski, Lukasz 
>  > wrote:
> Hi,
> 
>  
> 
> Many people interpret this slide from Databricks
> 
> https://ogirardot.files.wordpress.com/2015/05/future-of-spark.png 
> 
> as indication that Dataframes API is going to be the main processing unit of 
> Spark and sole access point to MLlib, Streaming and such. Is it true? My 
> impression was that Dataframes are an additional abstraction layer with some 
> promising optimisation coming from Tungsten project, but that’s all. RDDs are 
> there to stay. They are a natural selection when it comes to e.g. processing 
> images.
> 
>  
> 
> Here is one article that advertises Dataframes as a “sole data structure for 
> parallel computations”:
> 
> https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/
>  
> 
>  (paragraph 4)
> 
>  
> 
> Cheers,
> 
> Lucas
> 
>  
> 
>  
> 
> 
> 
> ==
> Please access the attached hyperlink for an important electronic 
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
> 
> ==
> 
> 



Re: Using a variable (a column name) in an IF statement in Spark SQL

2015-10-08 Thread Michael Armbrust
Hmm, that looks like it should work to me.  What version of Spark?  What is
the schema of goods?

On Thu, Oct 8, 2015 at 6:13 AM, Maheshakya Wijewardena 
wrote:

> Hi,
>
> Suppose there is data frame called goods with columns "barcode" and
> "items". Some of the values in the column "items" can be null.
>
> I want to the barcode and the respective items from the table adhering the
> following rules:
>
>- If "items" is null -> output 0
>- else -> output "items" ( the actual value in the column)
>
> I would write a query like:
>
> *SELECT barcode, IF(items is null, 0, items) FROM goods*
>
> But this query fails with the error:
>
> *unresolved operator 'Project [if (IS NULL items#1) 0 else items#1 AS
> c0#132]; *
>
> It seems I can only use numerical values inside this IF statement, but
> when a column name is used, it fails.
>
> Is there any workaround to do this?
>
> Best regards.
> --
> Pruthuvi Maheshakya Wijewardena
> Software Engineer
> WSO2 : http://wso2.com/
> Email: mahesha...@wso2.com
> Mobile: +94711228855
>
>
>


RE: Using Sqark SQL mapping over an RDD

2015-10-08 Thread Afshartous, Nick

> You can't do nested operations on RDDs or DataFrames (i.e. you can't create a 
> DataFrame from within a map function).  Perhaps if you explain what you are 
> trying to accomplish someone can suggest another way.

The code below what I had in mind.  For each Id, I'd like to run a query using 
the Id in the where clause, and then depending on the result possibly run a 
second query.  Either the result of the first or second query
will be used to construct the output of the map function.

Thanks for any suggestions,
--
  Nick


val result = deviceIds.map(deviceId => {
   val withAnalyticsId = sqlContext.sql(
   "select * from ad_info where deviceId = '%1s' and analyticsId <> 'null' 
order by messageTime asc limit 1" format (deviceId))

   if (withAnalyticsId.count() > 0) {
   withAnalyticsId.take(1)(0)
   }
   else {
   val withoutAnalyticsId = sqlContext.sql("select * from ad_info where 
deviceId = '%1s' order by messageTime desc limit 1" format (deviceId))

   withoutAnalyticsId.take(1)(0)
   }
})





From: Michael Armbrust [mich...@databricks.com]
Sent: Thursday, October 08, 2015 1:16 PM
To: Afshartous, Nick
Cc: user@spark.apache.org
Subject: Re: Using Sqark SQL mapping over an RDD

You can't do nested operations on RDDs or DataFrames (i.e. you can't create a 
DataFrame from within a map function).  Perhaps if you explain what you are 
trying to accomplish someone can suggest another way.

On Thu, Oct 8, 2015 at 10:10 AM, Afshartous, Nick 
> wrote:

Hi,

Am using Spark, 1.5 in latest EMR 4.1.

I have an RDD of String

   scala> deviceIds
  res25: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at map at 
:28

and then when trying to map over the RDD while attempting to run a sql query 
the result is a NullPointerException

  scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()

with the stack trace below.  If I run the query as a top level expression the 
count is retuned.  There was additional code within
the anonymous function that's been removed to try and isolate.

Thanks for any insights or advice on how to debug this.
--
  Nick


scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
15/10/08 16:12:56 INFO SparkContext: Starting job: count at :40
15/10/08 16:12:56 INFO DAGScheduler: Got job 18 (count at :40) with 
200 output partitions
15/10/08 16:12:56 INFO DAGScheduler: Final stage: ResultStage 37(count at 
:40)
15/10/08 16:12:56 INFO DAGScheduler: Parents of final stage: 
List(ShuffleMapStage 36)
15/10/08 16:12:56 INFO DAGScheduler: Missing parents: List()
15/10/08 16:12:56 INFO DAGScheduler: Submitting ResultStage 37 
(MapPartitionsRDD[37] at map at :40), which has no missing parents
15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(17904) called with 
curMem=531894, maxMem=560993402
15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22 stored as values in 
memory (estimated size 17.5 KB, free 534.5 MB)
15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(7143) called with 
curMem=549798, maxMem=560993402
15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22_piece0 stored as bytes 
in memory (estimated size 7.0 KB, free 534.5 MB)
15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 
10.247.0.117:33555 (size: 7.0 KB, free: 535.0 MB)
15/10/08 16:12:56 INFO SparkContext: Created broadcast 22 from broadcast at 
DAGScheduler.scala:861
15/10/08 16:12:56 INFO DAGScheduler: Submitting 200 missing tasks from 
ResultStage 37 (MapPartitionsRDD[37] at map at :40)
15/10/08 16:12:56 INFO YarnScheduler: Adding task set 37.0 with 200 tasks
15/10/08 16:12:56 INFO TaskSetManager: Starting task 0.0 in stage 37.0 (TID 
649, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
15/10/08 16:12:56 INFO TaskSetManager: Starting task 1.0 in stage 37.0 (TID 
650, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 
ip-10-247-0-117.ec2.internal:46227 (size: 7.0 KB, free: 535.0 MB)
15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in memory on 
ip-10-247-0-117.ec2.internal:32938 (size: 7.0 KB, free: 535.0 MB)
15/10/08 16:12:56 INFO TaskSetManager: Starting task 2.0 in stage 37.0 (TID 
651, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
15/10/08 16:12:56 WARN TaskSetManager: Lost task 0.0 in stage 37.0 (TID 649, 
ip-10-247-0-117.ec2.internal): java.lang.NullPointerException
at 
$line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:40)
at 
$line101.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:40)
at 

RE: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
Hi, thanks for looking into. v1.5.1. I am really worried.
I dont have hive/hadoop for real in the environment.

Saif

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Thursday, October 08, 2015 2:57 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: RowNumber in HiveContext returns null or negative values

Which version of Spark?

On Thu, Oct 8, 2015 at 7:25 AM, 
> wrote:
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif




Re: Using Sqark SQL mapping over an RDD

2015-10-08 Thread Michael Armbrust
You are probably looking for a groupby instead:

sqlContext.sql("SELECT COUNT(*) FROM ad_info GROUP BY deviceId")

On Thu, Oct 8, 2015 at 10:27 AM, Afshartous, Nick 
wrote:

>
> > You can't do nested operations on RDDs or DataFrames (i.e. you can't
> create a DataFrame from within a map function).  Perhaps if you explain
> what you are trying to accomplish someone can suggest another way.
>
> The code below what I had in mind.  For each Id, I'd like to run a query
> using the Id in the where clause, and then depending on the result possibly
> run a second query.  Either the result of the first or second query
> will be used to construct the output of the map function.
>
> Thanks for any suggestions,
> --
>   Nick
>
>
> val result = deviceIds.map(deviceId => {
>val withAnalyticsId = sqlContext.sql(
>"select * from ad_info where deviceId = '%1s' and analyticsId <>
> 'null' order by messageTime asc limit 1" format (deviceId))
>
>if (withAnalyticsId.count() > 0) {
>withAnalyticsId.take(1)(0)
>}
>else {
>val withoutAnalyticsId = sqlContext.sql("select * from ad_info
> where deviceId = '%1s' order by messageTime desc limit 1" format (deviceId))
>
>withoutAnalyticsId.take(1)(0)
>}
> })
>
>
>
>
> 
> From: Michael Armbrust [mich...@databricks.com]
> Sent: Thursday, October 08, 2015 1:16 PM
> To: Afshartous, Nick
> Cc: user@spark.apache.org
> Subject: Re: Using Sqark SQL mapping over an RDD
>
> You can't do nested operations on RDDs or DataFrames (i.e. you can't
> create a DataFrame from within a map function).  Perhaps if you explain
> what you are trying to accomplish someone can suggest another way.
>
> On Thu, Oct 8, 2015 at 10:10 AM, Afshartous, Nick  > wrote:
>
> Hi,
>
> Am using Spark, 1.5 in latest EMR 4.1.
>
> I have an RDD of String
>
>scala> deviceIds
>   res25: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[18] at
> map at :28
>
> and then when trying to map over the RDD while attempting to run a sql
> query the result is a NullPointerException
>
>   scala> deviceIds.map(id => sqlContext.sql("select * from
> ad_info")).count()
>
> with the stack trace below.  If I run the query as a top level expression
> the count is retuned.  There was additional code within
> the anonymous function that's been removed to try and isolate.
>
> Thanks for any insights or advice on how to debug this.
> --
>   Nick
>
>
> scala> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
> deviceIds.map(id => sqlContext.sql("select * from ad_info")).count()
> 15/10/08 16:12:56 INFO SparkContext: Starting job: count at :40
> 15/10/08 16:12:56 INFO DAGScheduler: Got job 18 (count at :40)
> with 200 output partitions
> 15/10/08 16:12:56 INFO DAGScheduler: Final stage: ResultStage 37(count at
> :40)
> 15/10/08 16:12:56 INFO DAGScheduler: Parents of final stage:
> List(ShuffleMapStage 36)
> 15/10/08 16:12:56 INFO DAGScheduler: Missing parents: List()
> 15/10/08 16:12:56 INFO DAGScheduler: Submitting ResultStage 37
> (MapPartitionsRDD[37] at map at :40), which has no missing parents
> 15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(17904) called with
> curMem=531894, maxMem=560993402
> 15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22 stored as values in
> memory (estimated size 17.5 KB, free 534.5 MB)
> 15/10/08 16:12:56 INFO MemoryStore: ensureFreeSpace(7143) called with
> curMem=549798, maxMem=560993402
> 15/10/08 16:12:56 INFO MemoryStore: Block broadcast_22_piece0 stored as
> bytes in memory (estimated size 7.0 KB, free 534.5 MB)
> 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in
> memory on 10.247.0.117:33555 (size: 7.0 KB,
> free: 535.0 MB)
> 15/10/08 16:12:56 INFO SparkContext: Created broadcast 22 from broadcast
> at DAGScheduler.scala:861
> 15/10/08 16:12:56 INFO DAGScheduler: Submitting 200 missing tasks from
> ResultStage 37 (MapPartitionsRDD[37] at map at :40)
> 15/10/08 16:12:56 INFO YarnScheduler: Adding task set 37.0 with 200 tasks
> 15/10/08 16:12:56 INFO TaskSetManager: Starting task 0.0 in stage 37.0
> (TID 649, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
> 15/10/08 16:12:56 INFO TaskSetManager: Starting task 1.0 in stage 37.0
> (TID 650, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
> 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in
> memory on ip-10-247-0-117.ec2.internal:46227 (size: 7.0 KB, free: 535.0 MB)
> 15/10/08 16:12:56 INFO BlockManagerInfo: Added broadcast_22_piece0 in
> memory on ip-10-247-0-117.ec2.internal:32938 (size: 7.0 KB, free: 535.0 MB)
> 15/10/08 16:12:56 INFO TaskSetManager: Starting task 2.0 in stage 37.0
> (TID 651, ip-10-247-0-117.ec2.internal, PROCESS_LOCAL, 1914 bytes)
> 15/10/08 16:12:56 WARN TaskSetManager: Lost task 0.0 in stage 37.0 (TID
> 649, ip-10-247-0-117.ec2.internal): 

Re: Dataframes - sole data structure for parallel computations?

2015-10-08 Thread Michael Armbrust
Don't worry, the ability to work with domain objects and lambda functions
is not going to go away.  However, we are looking at ways to leverage
Tungsten's improved performance when processing structured data.

More details can be found here:
https://issues.apache.org/jira/browse/SPARK-

On Thu, Oct 8, 2015 at 7:40 AM, Tracewski, Lukasz <
lukasz.tracew...@credit-suisse.com> wrote:

> Hi,
>
>
>
> Many people interpret this slide from Databricks
>
> https://ogirardot.files.wordpress.com/2015/05/future-of-spark.png
>
> as indication that Dataframes API is going to be the main processing unit
> of Spark and sole access point to MLlib, Streaming and such. Is it true? My
> impression was that Dataframes are an additional abstraction layer with
> some promising optimisation coming from Tungsten project, but that’s all.
> RDDs are there to stay. They are a natural selection when it comes to e.g.
> processing images.
>
>
>
> Here is one article that advertises Dataframes as a “sole data structure
> for parallel computations”:
>
>
> https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/
> (paragraph 4)
>
>
>
> Cheers,
>
> Lucas
>
>
>
>
>
>
>
> ==
> Please access the attached hyperlink for an important electronic
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
>
> ==
>


RE: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
It turns out this does not happen in local[32] mode. Only happens when 
submiting to standalone cluster. Don’t have YARN/MESOS to compare.

Will keep diagnosing.

Saif

From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Thursday, October 08, 2015 3:01 PM
To: mich...@databricks.com
Cc: user@spark.apache.org
Subject: RE: RowNumber in HiveContext returns null or negative values

Hi, thanks for looking into. v1.5.1. I am really worried.
I dont have hive/hadoop for real in the environment.

Saif

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Thursday, October 08, 2015 2:57 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: RowNumber in HiveContext returns null or negative values

Which version of Spark?

On Thu, Oct 8, 2015 at 7:25 AM, 
> wrote:
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif




Re: Applicative logs on Yarn

2015-10-08 Thread Ted Yu
This question seems better suited for u...@hadoop.apache.org

FYI

On Thu, Oct 8, 2015 at 10:37 AM,  wrote:

> Hello,
> I submit spark streaming inside Yarn, I have configured yarn to generate
> custom logs.
> It works fine and yarn aggregate very well the logs inside HDFS,
> nevertheless the log files are only usable via "yarn logs" command.
> I would prefer to be able to navigate inside via hdfs command like a text
> file, is it possible to aggregate those files as text file ?
>
> Tks
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Insert via HiveContext is slow

2015-10-08 Thread Daniel Haviv
Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
   lateral view explode(usChnlList) usParamLine as usParamLine
   lateral view explode(dsChnlList) dsParamLine as dsParamLine
   insert into table UpStreamParam partition(day_ts, cmtsid)
   select cmtstimestamp,datats,macaddress,
usParamLine['chnlidx'] chnlidx,
usParamLine['modulation'] modulation,
usParamLine['severity'] severity,
usParamLine['rxpower'] rxpower,
usParamLine['sigqnoise'] sigqnoise,
usParamLine['noisedeviation'] noisedeviation,
usParamLine['prefecber'] prefecber,
usParamLine['postfecber'] postfecber,
usParamLine['txpower'] txpower,
usParamLine['txpowerdrop'] txpowerdrop,
usParamLine['nmter'] nmter,
usParamLine['premtter'] premtter,
usParamLine['postmtter'] postmtter,
usParamLine['unerroreds'] unerroreds,
usParamLine['corrected'] corrected,
usParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd')
day_ts,
cmtsid
   insert into table DwnStreamParam partition(day_ts, cmtsid)
   select  cmtstimestamp,datats,macaddress,
dsParamLine['chnlidx'] chnlidx,
dsParamLine['modulation'] modulation,
dsParamLine['severity'] severity,
dsParamLine['rxpower'] rxpower,
dsParamLine['sigqnoise'] sigqnoise,
dsParamLine['noisedeviation'] noisedeviation,
dsParamLine['prefecber'] prefecber,
dsParamLine['postfecber'] postfecber,
dsParamLine['sigqrxmer'] sigqrxmer,
dsParamLine['sigqmicroreflection'] sigqmicroreflection,
dsParamLine['unerroreds'] unerroreds,
dsParamLine['corrected'] corrected,
dsParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd')
day_ts,
cmtsid
""")



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <
daniel.ha...@veracity-group.com> wrote:

> Hi,
> I'm inserting into a partitioned ORC table using an insert sql statement
> passed via HiveContext.
> The performance I'm getting is pretty bad and I was wondering if there are
> ways to speed things up.
> Would saving the DF like this 
> df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
> be faster ?
>
>
> Thank you.
> Daniel
>


Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Umesh Kacha
Hi Lan, thanks for the response yes I know and I have confirmed in UI that
it has only 12 partitions because of 12 HDFS blocks and hive orc file strip
size is 33554432.

On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang  wrote:

> The partition number should be the same as the HDFS block number instead
> of file number. Did you confirmed from the spark UI that only 12 partitions
> were created? What is your ORC orc.stripe.size?
>
> Lan
>
>
> > On Oct 8, 2015, at 1:13 PM, unk1102  wrote:
> >
> > Hi I have the following code where I read ORC files from HDFS and it
> loads
> > directory which contains 12 ORC files. Now since HDFS directory contains
> 12
> > files it will create 12 partitions by default. These directory is huge
> and
> > when ORC files gets decompressed it becomes around 10 GB how do I
> increase
> > partitions for the below code so that my Spark job runs faster and does
> not
> > hang for long time because of reading 10 GB files through shuffle in 12
> > partitions. Please guide.
> >
> > DataFrame df =
> > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
> > df.select().groupby(..)
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
>


Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Lan Jiang
Hmm, that’s odd. 

You can always use repartition(n) to increase the partition number, but then 
there will be shuffle. How large is your ORC file? Have you used NameNode UI to 
check how many HDFS blocks each ORC file has?

Lan


> On Oct 8, 2015, at 2:08 PM, Umesh Kacha  wrote:
> 
> Hi Lan, thanks for the response yes I know and I have confirmed in UI that it 
> has only 12 partitions because of 12 HDFS blocks and hive orc file strip size 
> is 33554432.
> 
> On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang  > wrote:
> The partition number should be the same as the HDFS block number instead of 
> file number. Did you confirmed from the spark UI that only 12 partitions were 
> created? What is your ORC orc.stripe.size?
> 
> Lan
> 
> 
> > On Oct 8, 2015, at 1:13 PM, unk1102  > > wrote:
> >
> > Hi I have the following code where I read ORC files from HDFS and it loads
> > directory which contains 12 ORC files. Now since HDFS directory contains 12
> > files it will create 12 partitions by default. These directory is huge and
> > when ORC files gets decompressed it becomes around 10 GB how do I increase
> > partitions for the below code so that my Spark job runs faster and does not
> > hang for long time because of reading 10 GB files through shuffle in 12
> > partitions. Please guide.
> >
> > DataFrame df =
> > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
> > df.select().groupby(..)
> >
> >
> >
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
> >  
> > 
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> > 
> > For additional commands, e-mail: user-h...@spark.apache.org 
> > 
> >
> 
> 



Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Umesh Kacha
Hi Lan thanks for the reply. I have tried to do the following but it did
not increase partition

DataFrame df = hiveContext.read().format("orc").load("/hdfs/path/to/orc/
files/").repartition(100);

Yes I have checked in namenode ui ORC files contains 12 files/blocks of 128
MB each and ORC files when decompressed its around 10 GB and its
uncompressed file size is around 1 GB

On Fri, Oct 9, 2015 at 12:43 AM, Lan Jiang  wrote:

> Hmm, that’s odd.
>
> You can always use repartition(n) to increase the partition number, but
> then there will be shuffle. How large is your ORC file? Have you used
> NameNode UI to check how many HDFS blocks each ORC file has?
>
> Lan
>
>
> On Oct 8, 2015, at 2:08 PM, Umesh Kacha  wrote:
>
> Hi Lan, thanks for the response yes I know and I have confirmed in UI that
> it has only 12 partitions because of 12 HDFS blocks and hive orc file strip
> size is 33554432.
>
> On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang  wrote:
>
>> The partition number should be the same as the HDFS block number instead
>> of file number. Did you confirmed from the spark UI that only 12 partitions
>> were created? What is your ORC orc.stripe.size?
>>
>> Lan
>>
>>
>> > On Oct 8, 2015, at 1:13 PM, unk1102  wrote:
>> >
>> > Hi I have the following code where I read ORC files from HDFS and it
>> loads
>> > directory which contains 12 ORC files. Now since HDFS directory
>> contains 12
>> > files it will create 12 partitions by default. These directory is huge
>> and
>> > when ORC files gets decompressed it becomes around 10 GB how do I
>> increase
>> > partitions for the below code so that my Spark job runs faster and does
>> not
>> > hang for long time because of reading 10 GB files through shuffle in 12
>> > partitions. Please guide.
>> >
>> > DataFrame df =
>> > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
>> > df.select().groupby(..)
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>>
>
>


Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread Tian Zhang
I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will 
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at java.io.FileOutputStream.(FileOutputStream.java:171)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming DirectKafka assertion errors

2015-10-08 Thread Cody Koeninger
It sounds like you moved the job from one environment to another?

This may sound silly, but make sure (eg using lsof) the brokers the job is
connecting to are actually the ones you expect.

As far as the checkpoint goes, the log output should indicate whether the
job is restoring from checkpoint.  Make sure that output no longer shows up
after you stopped the job, deleted the checkpoint directory, and restarted
it.



On Thu, Oct 8, 2015 at 2:51 PM, Roman Garcia  wrote:

> I'm running Spark Streaming using Kafka Direct stream, expecting
> exactly-once semantics using checkpoints (which are stored onto HDFS).
> My Job is really simple, it opens 6 Kafka streams (6 topics with 4 parts
> each) and stores every row to ElasticSearch using ES-Spark integration.
>
> This job was working without issues on a different environment, but on
> this new environment, I've started to see these assertion errors:
>
> "Job aborted due to stage failure: Task 7 in stage 79.0 failed 4 times,
> most recent failure: Lost task 7.3 in stage 79.0 (TID 762, 192.168.18.219):
> java.lang.AssertionError: assertion failed: Beginning offset 20 is after
> the ending offset 14 for topic some-topic partition 1. You either provided
> an invalid fromOffset, or the Kafka topic has been damaged"
>
> and also
>
> "Job aborted due to stage failure: Task 12 in stage 83.0 failed 4 times,
> most recent failure: Lost task 12.3 in stage 83.0 (TID 815,
> 192.168.18.219): java.lang.AssertionError: assertion failed: Ran out of
> messages before reaching ending offset 20 for topic some-topic partition 1
> start 14. This should not happen, and indicates that messages may have been
> lost"
>
> When querying my Kafka cluster (3 brokers, 7 topics, 4 parts each topic, 3
> zookeeper nodes, no other consumers), I see what appaears to differ from
> Spark offset info:
>
> *running kafka.tools.GetOffsetShell --time -1*
> some-topic:0:20
> some-topic:1:20
> some-topic:2:19
> some-topic:3:20
> *running kafka.tools.GetOffsetShell --time -2*
> some-topic:0:0
> some-topic:1:0
> some-topic:2:0
> some-topic:3:0
>
> *running kafka-simple-consumer-shell* I can see all stored messages until
> offset 20, with a final output: "Terminating. Reached the end of partition
> (some-topic, 1) at offset 20"
>
> I tried removing the whole checkpoint dir and start over, but it keeps
> failing.
>
> It looks like these tasks get retried without end. On the spark-ui
> streaming tab I see the "Active batches" increase with a confusing "Input
> size" value of "-19" (negative size?)
>
> Any pointers will help
> Thanks
>
> Roman
>
>


Re: ValueError: can not serialize object larger than 2G

2015-10-08 Thread Ted Yu
To fix the problem, consider increasing number of partitions for your job.

Showing code snippet would help us understand your use case better.

Cheers

On Thu, Oct 8, 2015 at 1:39 PM, Ted Yu  wrote:

> See the comment of FramedSerializer() in serializers.py :
>
> Serializer that writes objects as a stream of (length, data) pairs,
> where C{length} is a 32-bit integer and data is C{length} bytes.
>
> Hence the limit on the size of object.
>
> On Thu, Oct 8, 2015 at 12:56 PM, XIANDI  wrote:
>
>>   File "/home/hadoop/spark/python/pyspark/worker.py", line 101, in main
>> process()
>>   File "/home/hadoop/spark/python/pyspark/worker.py", line 96, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>   File "/home/hadoop/spark/python/pyspark/serializers.py", line 126, in
>> dump_stream
>> self._write_with_length(obj, stream)
>>   File "/home/hadoop/spark/python/pyspark/serializers.py", line 140, in
>> _write_with_length
>> raise ValueError("can not serialize object larger than 2G")
>> ValueError: can not serialize object larger than 2G
>>
>> Does anyone know how does this happen?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/ValueError-can-not-serialize-object-larger-than-2G-tp24984.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Unexplained sleep time

2015-10-08 Thread yael aharon
Hello,
I am working on improving the performance of our Spark on Yarn applications.
Scanning through the logs I found the following lines:


[2015-10-07T16:25:17.245-04:00] [DataProcessing] [INFO] []
[org.apache.spark.Logging$class] [tid:main] [userID:yarn] Started
progress reporter thread - sleep time : 5000
[2015-10-07T16:25:22.262-04:00] [DataProcessing] [INFO] []
[org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl] [tid:Reporter]
[userID:yarn] Received new token for : hostname:8041


As the log says, the main thread sleeps for 5 seconds. Is there a way to
configure/eliminate this sleep?
thanks, Yael


Re: ValueError: can not serialize object larger than 2G

2015-10-08 Thread Ted Yu
See the comment of FramedSerializer() in serializers.py :

Serializer that writes objects as a stream of (length, data) pairs,
where C{length} is a 32-bit integer and data is C{length} bytes.

Hence the limit on the size of object.

On Thu, Oct 8, 2015 at 12:56 PM, XIANDI  wrote:

>   File "/home/hadoop/spark/python/pyspark/worker.py", line 101, in main
> process()
>   File "/home/hadoop/spark/python/pyspark/worker.py", line 96, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/home/hadoop/spark/python/pyspark/serializers.py", line 126, in
> dump_stream
> self._write_with_length(obj, stream)
>   File "/home/hadoop/spark/python/pyspark/serializers.py", line 140, in
> _write_with_length
> raise ValueError("can not serialize object larger than 2G")
> ValueError: can not serialize object larger than 2G
>
> Does anyone know how does this happen?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ValueError-can-not-serialize-object-larger-than-2G-tp24984.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


unsubscribe

2015-10-08 Thread Jürgen Fey



Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread DB Tsai
Try to run to see actual ulimit. We found that mesos overrides the ulimit
which causes the issue.

import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect




Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang  wrote:

> I hit this issue with spark 1.3.0 stateful application (with
> updateStateByKey) function on mesos.  It will
> fail after running fine for about 24 hours.
> The error stack trace as below, I checked ulimit -n and we have very large
> numbers set on the machines.
> What else can be wrong?
> 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
> 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
> java.io.FileNotFoundException:
>
> /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
> (Too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at java.io.FileOutputStream.(FileOutputStream.java:171)
> at
>
> org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


FW: ValueError: can not serialize object larger than 2G

2015-10-08 Thread Xiandi Zhang
--def
 parse_record(x):formatted = list(x[0])if (type(x[1])!=list) & 
(type(x[1])!=tuple):formatted.append(x[1])else:
formatted.extend(list(x[1]))return formatted
def writeRecords(records):output = StringIO.StringIO()for record in 
records:writer = csv.writer(output, delimiter=',')
writer.writerow(record)return [output.getvalue()]
validation_formatted = validation_res.map(lambda x: 
parse_record(x))validation_partition = 
validation_formatted.repartition(80)validation_partition.mapPartitions(writeRecords).saveAsTextFile(broadcast_path
 + 
"para_predictions")--
The error happens when saveAsTextFile. I changed the  repartition number to 200 
or 300. But still got this error. 
Thanks for help.
Date: Thu, 8 Oct 2015 13:55:32 -0700
Subject: Re: ValueError: can not serialize object larger than 2G
From: yuzhih...@gmail.com
To: zxd_ci...@hotmail.com
CC: user@spark.apache.org

To fix the problem, consider increasing number of partitions for your job.
Showing code snippet would help us understand your use case better.
Cheers
On Thu, Oct 8, 2015 at 1:39 PM, Ted Yu  wrote:
See the comment of FramedSerializer() in serializers.py :
Serializer that writes objects as a stream of (length, data) pairs,
where C{length} is a 32-bit integer and data is C{length} bytes.
Hence the limit on the size of object.
On Thu, Oct 8, 2015 at 12:56 PM, XIANDI  wrote:
  File "/home/hadoop/spark/python/pyspark/worker.py", line 101, in main

process()

  File "/home/hadoop/spark/python/pyspark/worker.py", line 96, in process

serializer.dump_stream(func(split_index, iterator), outfile)

  File "/home/hadoop/spark/python/pyspark/serializers.py", line 126, in

dump_stream

self._write_with_length(obj, stream)

  File "/home/hadoop/spark/python/pyspark/serializers.py", line 140, in

_write_with_length

raise ValueError("can not serialize object larger than 2G")

ValueError: can not serialize object larger than 2G



Does anyone know how does this happen?



Thanks!







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ValueError-can-not-serialize-object-larger-than-2G-tp24984.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



-

To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

For additional commands, e-mail: user-h...@spark.apache.org







  

Re: unsubscribe

2015-10-08 Thread Ted Yu
Take a look at the first section of:
http://spark.apache.org/community

On Thu, Oct 8, 2015 at 2:10 PM, Jürgen Fey 
wrote:

>
>


Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Ted Yu
bq. contains 12 files/blocks

Looks like you hit the limit of parallelism these files can provide.

If you have larger dataset, you would have more partitions.

On Thu, Oct 8, 2015 at 12:21 PM, Umesh Kacha  wrote:

> Hi Lan thanks for the reply. I have tried to do the following but it did
> not increase partition
>
> DataFrame df = hiveContext.read().format("orc").load("/hdfs/path/to/orc/
> files/").repartition(100);
>
> Yes I have checked in namenode ui ORC files contains 12 files/blocks of
> 128 MB each and ORC files when decompressed its around 10 GB and its
> uncompressed file size is around 1 GB
>
> On Fri, Oct 9, 2015 at 12:43 AM, Lan Jiang  wrote:
>
>> Hmm, that’s odd.
>>
>> You can always use repartition(n) to increase the partition number, but
>> then there will be shuffle. How large is your ORC file? Have you used
>> NameNode UI to check how many HDFS blocks each ORC file has?
>>
>> Lan
>>
>>
>> On Oct 8, 2015, at 2:08 PM, Umesh Kacha  wrote:
>>
>> Hi Lan, thanks for the response yes I know and I have confirmed in UI
>> that it has only 12 partitions because of 12 HDFS blocks and hive orc file
>> strip size is 33554432.
>>
>> On Thu, Oct 8, 2015 at 11:55 PM, Lan Jiang  wrote:
>>
>>> The partition number should be the same as the HDFS block number instead
>>> of file number. Did you confirmed from the spark UI that only 12 partitions
>>> were created? What is your ORC orc.stripe.size?
>>>
>>> Lan
>>>
>>>
>>> > On Oct 8, 2015, at 1:13 PM, unk1102  wrote:
>>> >
>>> > Hi I have the following code where I read ORC files from HDFS and it
>>> loads
>>> > directory which contains 12 ORC files. Now since HDFS directory
>>> contains 12
>>> > files it will create 12 partitions by default. These directory is huge
>>> and
>>> > when ORC files gets decompressed it becomes around 10 GB how do I
>>> increase
>>> > partitions for the below code so that my Spark job runs faster and
>>> does not
>>> > hang for long time because of reading 10 GB files through shuffle in 12
>>> > partitions. Please guide.
>>> >
>>> > DataFrame df =
>>> > hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
>>> > df.select().groupby(..)
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>>
>>
>>
>


[Spark 1.5] Kinesis receivers not starting

2015-10-08 Thread Bharath Mukkati
Hi Spark Users,

I am testing my application on Spark 1.5 and kinesis-asl-1.5. The streaming
application starts but I see a ton of stages scheduled for
ReceiverTracker (submitJob
at ReceiverTracker.scala:557 ).

In the driver logs I see this sequence repeat:
15/10/09 00:10:54 INFO INFO ReceiverTracker: Starting 100 receivers
15/10/09 00:10:54 INFO ReceiverTracker: ReceiverTracker started

15/10/09 00:10:54 INFO ReceiverTracker: Receiver 0 started
15/10/09 00:10:54 DEBUG ClosureCleaner: +++ Cleaning closure 
(org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9)
+++
15/10/09 00:10:54 DEBUG ClosureCleaner:  + declared fields: 3
15/10/09 00:10:54 DEBUG ClosureCleaner:  public static final long
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.serialVersionUID
15/10/09 00:10:54 DEBUG ClosureCleaner:  private final scala.Option
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.checkpointDirOption$1
15/10/09 00:10:54 DEBUG ClosureCleaner:  private final
org.apache.spark.util.SerializableConfiguration
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.serializableHadoopConf$1
15/10/09 00:10:54 DEBUG ClosureCleaner:  + declared methods: 2
15/10/09 00:10:54 DEBUG ClosureCleaner:  public final java.lang.Object
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(java.lang.Object)
15/10/09 00:10:54 DEBUG ClosureCleaner:  public final void
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(scala.collection.Iterator)
15/10/09 00:10:54 DEBUG ClosureCleaner:  + inner classes: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + outer classes: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + outer objects: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + populating accessed fields
because this is the starting closure
15/10/09 00:10:54 DEBUG ClosureCleaner:  + fields accessed by starting
closure: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + there are no enclosing objects!
15/10/09 00:10:54 DEBUG ClosureCleaner:  +++ closure 
(org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9)
is now cleaned +++

...
(and so on for 100 receivers)

And then I start seeing ...
15/10/09 00:11:02 INFO ReceiverTracker: Restarting Receiver 36
.. and so on for the other receivers

After which the I see Receiver started logs
15/10/09 00:11:02 INFO ReceiverTracker: Receiver 20 started
..
Again the Restarting Receiver logs appear

After a while the driver hangs, no new logs appear although the app seems
to be running. The streaming console shows scheduled stages and jobs.

There are no ERROR logs in the driver. However I see the following
Exceptions (DEBUG logs)

akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@ip-:57886
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
] from Actor[akka://sparkDriver/deadLetters]
15/10/09 00:10:37 DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AssociationError [akka.tcp://sparkDriver@:39053] <-
[akka.tcp://driverPropsFetcher@:57886]: Error [Shut down address:
akka.tcp://driverPropsFetcher@:57886] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@:57886
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
] from Actor[akka://sparkDriver/deadLetters]

In one of the executor logs I see the following Exceptions:

application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 WARN receiver.ReceiverSupervisorImpl: Skip stopping receiver
because it has not yet stared
application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Stopping BlockGenerator
application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Waiting for block pushing thread to
terminate
application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Pushing out the last 0 blocks
application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Stopped block pushing thread
application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Stopped BlockGenerator
application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be
stopped
application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
00:45:37 INFO 

error in sparkSQL 1.5 using count(1) in nested queries

2015-10-08 Thread Jeff Thompson
After upgrading from 1.4.1 to 1.5.1 I found some of my spark SQL queries no
longer worked.  Seems to be related to using count(1) or count(*) in a
nested query.  I can reproduce the issue in a pyspark shell with the sample
code below.  The ‘people’ table is from spark-1.5.1-bin-hadoop2.4/
examples/src/main/resources/people.json.

Environment details: Hadoop 2.5.0-cdh5.3.0, YARN

*Test code:*

from pyspark.sql import SQLContext
print(sc.version)
sqlContext = SQLContext(sc)

df = sqlContext.read.json("/user/thj1pal/people.json")
df.show()

sqlContext.registerDataFrameAsTable(df,"PEOPLE")

result = sqlContext.sql("SELECT MIN(t0.age) FROM (SELECT * FROM PEOPLE
WHERE age > 0) t0 HAVING(COUNT(1) > 0)")
result.show()

*spark 1.4.1 output*

1.4.1
++---+
| age|   name|
++---+
|null|Michael|
|  30|   Andy|
|  19| Justin|
++---+

+--+
|c0|
+--+
|19|
+--+


*spark 1.5.1 output*

1.5.1
++---+
| age|   name|
++---+
|null|Michael|
|  30|   Andy|
|  19| Justin|
++---+

---
Py4JJavaError Traceback (most recent call last)
 in ()
  9
 10 result = sqlContext.sql("SELECT MIN(t0.age) FROM (SELECT * FROM
PEOPLE WHERE age > 0) t0 HAVING(COUNT(1) > 0)")
---> 11 result.show()

/home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/pyspark/sql/dataframe.pyc in
show(self, n, truncate)
254 +---+-+
255 """
--> 256 print(self._jdf.showString(n, truncate))
257
258 def __repr__(self):

/home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)
536 answer = self.gateway_client.send_command(command)
537 return_value = get_return_value(answer, self.gateway_client,
--> 538 self.target_id, self.name)
539
540 for temp_arg in temp_args:

/home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/pyspark/sql/utils.pyc in
deco(*a, **kw)
 34 def deco(*a, **kw):
 35 try:
---> 36 return f(*a, **kw)
 37 except py4j.protocol.Py4JJavaError as e:
 38 s = e.java_exception.toString()

/home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
298 raise Py4JJavaError(
299 'An error occurred while calling {0}{1}{2}.\n'.
--> 300 format(target_id, '.', name), value)
301 else:
302 raise Py4JError(

Py4JJavaError: An error occurred while calling o33.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage
4.0 (TID 9, pal-bd-n06-ib): java.lang.UnsupportedOperationException: Cannot
evaluate expression: count(1)
at
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:188)
at
org.apache.spark.sql.catalyst.expressions.Count.eval(aggregates.scala:156)
at
org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:327)
….


Re: Spark Streaming: Doing operation in Receiver vs RDD

2015-10-08 Thread Tathagata Das
Since it is about encryption and decryption, its also good know how the raw
data is actually saved in disk. If the write ahead log is enabled, then the
raw data will be saved to the WAL in HDFS. You probably do not want to save
decrypted data in that. So its better not to decrupt in the receiver, as
the decrypted data will get stored in memory and saved to WAL. So its
probably best to decrypt on the fly in RDD operations.

On Thu, Oct 8, 2015 at 3:14 AM, Iulian Dragoș 
wrote:

> You can have a look at
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#receiver-reliability
> for details on Receiver reliability. If you go the receiver way you'll need
> to enable Write Ahead Logs to ensure no data loss. In Kafka direct you
> don't have this problem.
>
> Regarding where to apply decryption, I'd lean towards doing it as RDD
> transformations for the reasons you mentioned. Also, in case only some
> fields are encrypted, this way you can delay decryption until really need
> (assuming some records would be filtered out, etc.).
>
> iulian
>
> On Wed, Oct 7, 2015 at 9:55 PM, emiretsk 
> wrote:
>
>> Hi,
>>
>> I have a Spark Streaming program that is consuming message from Kafka and
>> has to decrypt and deserialize each message. I can implement it either as
>> Kafka deserializer (that will run in a receiver or the new receiver-less
>> Kafka consumer)  or as RDD operations. What are the pros/cons of each?
>>
>> As I see it, doing the operations on RDDs has the following implications
>> Better load balancing, and fault tolerance. (though I'm not quite sure
>> what
>> happens when a receiver fails). Also, not sure if this is still true with
>> the new Kafka receiver-less consumer as it creates an RDD partition for
>> each
>> Kafka partition
>> All functions that are applied to RDDs need to be either static or part of
>> serialzable objects. This makes using standard/3rd party Java libraries
>> harder.
>> Cheers,
>> Eugene
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Doing-operation-in-Receiver-vs-RDD-tp24973.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
> --
> Iulian Dragos
>
> --
> Reactive Apps on the JVM
> www.typesafe.com
>
>


RE: Insert via HiveContext is slow

2015-10-08 Thread Cheng, Hao
I think that’s a known performance issue(Compared to Hive) of Spark SQL in 
multi-inserts.
A workaround is create a temp cached table for the projection first, and then 
do the multiple inserts base on the cached table.

We are actually working on the POC of some similar cases, hopefully it comes 
out soon.

Hao

From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: Friday, October 9, 2015 3:08 AM
To: user
Subject: Re: Insert via HiveContext is slow

Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
   lateral view explode(usChnlList) usParamLine as usParamLine
   lateral view explode(dsChnlList) dsParamLine as dsParamLine
   insert into table UpStreamParam partition(day_ts, cmtsid)
   select cmtstimestamp,datats,macaddress,
usParamLine['chnlidx'] chnlidx,
usParamLine['modulation'] modulation,
usParamLine['severity'] severity,
usParamLine['rxpower'] rxpower,
usParamLine['sigqnoise'] sigqnoise,
usParamLine['noisedeviation'] noisedeviation,
usParamLine['prefecber'] prefecber,
usParamLine['postfecber'] postfecber,
usParamLine['txpower'] txpower,
usParamLine['txpowerdrop'] txpowerdrop,
usParamLine['nmter'] nmter,
usParamLine['premtter'] premtter,
usParamLine['postmtter'] postmtter,
usParamLine['unerroreds'] unerroreds,
usParamLine['corrected'] corrected,
usParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid
   insert into table DwnStreamParam partition(day_ts, cmtsid)
   select  cmtstimestamp,datats,macaddress,
dsParamLine['chnlidx'] chnlidx,
dsParamLine['modulation'] modulation,
dsParamLine['severity'] severity,
dsParamLine['rxpower'] rxpower,
dsParamLine['sigqnoise'] sigqnoise,
dsParamLine['noisedeviation'] noisedeviation,
dsParamLine['prefecber'] prefecber,
dsParamLine['postfecber'] postfecber,
dsParamLine['sigqrxmer'] sigqrxmer,
dsParamLine['sigqmicroreflection'] sigqmicroreflection,
dsParamLine['unerroreds'] unerroreds,
dsParamLine['corrected'] corrected,
dsParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid

""")



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv 
> wrote:
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement passed 
via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are ways 
to speed things up.
Would saving the DF like this 
df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename") 
be faster ?


Thank you.
Daniel



Re: Using a variable (a column name) in an IF statement in Spark SQL

2015-10-08 Thread Maheshakya Wijewardena
Spark version: 1.4.1
The schema is "barcode STRING, items INT"

On Thu, Oct 8, 2015 at 10:48 PM, Michael Armbrust 
wrote:

> Hmm, that looks like it should work to me.  What version of Spark?  What
> is the schema of goods?
>
> On Thu, Oct 8, 2015 at 6:13 AM, Maheshakya Wijewardena <
> mahesha...@wso2.com> wrote:
>
>> Hi,
>>
>> Suppose there is data frame called goods with columns "barcode" and
>> "items". Some of the values in the column "items" can be null.
>>
>> I want to the barcode and the respective items from the table adhering
>> the following rules:
>>
>>- If "items" is null -> output 0
>>- else -> output "items" ( the actual value in the column)
>>
>> I would write a query like:
>>
>> *SELECT barcode, IF(items is null, 0, items) FROM goods*
>>
>> But this query fails with the error:
>>
>> *unresolved operator 'Project [if (IS NULL items#1) 0 else items#1 AS
>> c0#132]; *
>>
>> It seems I can only use numerical values inside this IF statement, but
>> when a column name is used, it fails.
>>
>> Is there any workaround to do this?
>>
>> Best regards.
>> --
>> Pruthuvi Maheshakya Wijewardena
>> Software Engineer
>> WSO2 : http://wso2.com/
>> Email: mahesha...@wso2.com
>> Mobile: +94711228855
>>
>>
>>
>


-- 
Pruthuvi Maheshakya Wijewardena
Software Engineer
WSO2 : http://wso2.com/
Email: mahesha...@wso2.com
Mobile: +94711228855


Re: Streaming DirectKafka assertion errors

2015-10-08 Thread Roman Garcia
Thanks Cody for your help. Actually i found out it was a issue on our
network. After doing a ping from spark node to kafka node i found there
were dup packages. After rebooting the kafka node everything went back to
normal!
Thanks for your help!
Roman

El jue., 8 de octubre de 2015 17:13, Cody Koeninger 
escribió:

> It sounds like you moved the job from one environment to another?
>
> This may sound silly, but make sure (eg using lsof) the brokers the job is
> connecting to are actually the ones you expect.
>
> As far as the checkpoint goes, the log output should indicate whether the
> job is restoring from checkpoint.  Make sure that output no longer shows up
> after you stopped the job, deleted the checkpoint directory, and restarted
> it.
>
>
>
> On Thu, Oct 8, 2015 at 2:51 PM, Roman Garcia 
> wrote:
>
>> I'm running Spark Streaming using Kafka Direct stream, expecting
>> exactly-once semantics using checkpoints (which are stored onto HDFS).
>> My Job is really simple, it opens 6 Kafka streams (6 topics with 4 parts
>> each) and stores every row to ElasticSearch using ES-Spark integration.
>>
>> This job was working without issues on a different environment, but on
>> this new environment, I've started to see these assertion errors:
>>
>> "Job aborted due to stage failure: Task 7 in stage 79.0 failed 4 times,
>> most recent failure: Lost task 7.3 in stage 79.0 (TID 762, 192.168.18.219):
>> java.lang.AssertionError: assertion failed: Beginning offset 20 is after
>> the ending offset 14 for topic some-topic partition 1. You either provided
>> an invalid fromOffset, or the Kafka topic has been damaged"
>>
>> and also
>>
>> "Job aborted due to stage failure: Task 12 in stage 83.0 failed 4 times,
>> most recent failure: Lost task 12.3 in stage 83.0 (TID 815,
>> 192.168.18.219): java.lang.AssertionError: assertion failed: Ran out of
>> messages before reaching ending offset 20 for topic some-topic partition 1
>> start 14. This should not happen, and indicates that messages may have been
>> lost"
>>
>> When querying my Kafka cluster (3 brokers, 7 topics, 4 parts each topic,
>> 3 zookeeper nodes, no other consumers), I see what appaears to differ from
>> Spark offset info:
>>
>> *running kafka.tools.GetOffsetShell --time -1*
>> some-topic:0:20
>> some-topic:1:20
>> some-topic:2:19
>> some-topic:3:20
>> *running kafka.tools.GetOffsetShell --time -2*
>> some-topic:0:0
>> some-topic:1:0
>> some-topic:2:0
>> some-topic:3:0
>>
>> *running kafka-simple-consumer-shell* I can see all stored messages
>> until offset 20, with a final output: "Terminating. Reached the end of
>> partition (some-topic, 1) at offset 20"
>>
>> I tried removing the whole checkpoint dir and start over, but it keeps
>> failing.
>>
>> It looks like these tasks get retried without end. On the spark-ui
>> streaming tab I see the "Active batches" increase with a confusing "Input
>> size" value of "-19" (negative size?)
>>
>> Any pointers will help
>> Thanks
>>
>> Roman
>>
>>
>


Re: [Spark 1.5] Kinesis receivers not starting

2015-10-08 Thread Tathagata Das
How many executors and cores do you acquire?

td

On Thu, Oct 8, 2015 at 6:11 PM, Bharath Mukkati 
wrote:

> Hi Spark Users,
>
> I am testing my application on Spark 1.5 and kinesis-asl-1.5. The
> streaming application starts but I see a ton of stages scheduled for
> ReceiverTracker (submitJob at ReceiverTracker.scala:557 ).
>
> In the driver logs I see this sequence repeat:
> 15/10/09 00:10:54 INFO INFO ReceiverTracker: Starting 100 receivers
> 15/10/09 00:10:54 INFO ReceiverTracker: ReceiverTracker started
>
> 15/10/09 00:10:54 INFO ReceiverTracker: Receiver 0 started
> 15/10/09 00:10:54 DEBUG ClosureCleaner: +++ Cleaning closure 
> (org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9)
> +++
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + declared fields: 3
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  public static final long
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.serialVersionUID
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  private final scala.Option
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.checkpointDirOption$1
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  private final
> org.apache.spark.util.SerializableConfiguration
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.serializableHadoopConf$1
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + declared methods: 2
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  public final java.lang.Object
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(java.lang.Object)
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  public final void
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(scala.collection.Iterator)
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + inner classes: 0
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + outer classes: 0
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + outer objects: 0
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + populating accessed fields
> because this is the starting closure
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + fields accessed by starting
> closure: 0
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  + there are no enclosing objects!
> 15/10/09 00:10:54 DEBUG ClosureCleaner:  +++ closure 
> (org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9)
> is now cleaned +++
>
> ...
> (and so on for 100 receivers)
>
> And then I start seeing ...
> 15/10/09 00:11:02 INFO ReceiverTracker: Restarting Receiver 36
> .. and so on for the other receivers
>
> After which the I see Receiver started logs
> 15/10/09 00:11:02 INFO ReceiverTracker: Receiver 20 started
> ..
> Again the Restarting Receiver logs appear
>
> After a while the driver hangs, no new logs appear although the app seems
> to be running. The streaming console shows scheduled stages and jobs.
>
> There are no ERROR logs in the driver. However I see the following
> Exceptions (DEBUG logs)
>
> akka.remote.ShutDownAssociation: Shut down address:
> akka.tcp://driverPropsFetcher@ip-:57886
> Caused by: akka.remote.transport.Transport$InvalidAssociationException:
> The remote system terminated the association because it is shutting down.
> ] from Actor[akka://sparkDriver/deadLetters]
> 15/10/09 00:10:37 DEBUG
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
> message AssociationError [akka.tcp://sparkDriver@:39053] <-
> [akka.tcp://driverPropsFetcher@:57886]: Error [Shut down address:
> akka.tcp://driverPropsFetcher@:57886] [
> akka.remote.ShutDownAssociation: Shut down address:
> akka.tcp://driverPropsFetcher@:57886
> Caused by: akka.remote.transport.Transport$InvalidAssociationException:
> The remote system terminated the association because it is shutting down.
> ] from Actor[akka://sparkDriver/deadLetters]
>
> In one of the executor logs I see the following Exceptions:
>
> application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
> 00:45:37 WARN receiver.ReceiverSupervisorImpl: Skip stopping receiver
> because it has not yet stared
> application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
> 00:45:37 INFO receiver.BlockGenerator: Stopping BlockGenerator
> application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
> 00:45:37 INFO receiver.BlockGenerator: Waiting for block pushing thread to
> terminate
> application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
> 00:45:37 INFO receiver.BlockGenerator: Pushing out the last 0 blocks
> application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
> 00:45:37 INFO receiver.BlockGenerator: Stopped block pushing thread
> application_1444344955519_0001/container_1444344955519_0001_01_05/stderr:15/10/09
> 00:45:37 INFO receiver.BlockGenerator: Stopped 

Re: Re: Error in load hbase on spark

2015-10-08 Thread Ted Yu
The second code snippet is similar to:
examples//src/main/scala/org/apache/spark/examples/HBaseTest.scala

See the comment in HBaseTest.scala :
// please ensure HBASE_CONF_DIR is on classpath of spark driver
// e.g: set it through spark.driver.extraClassPath property
// in spark-defaults.conf or through --driver-class-path
// command line option of spark-submit

If during execution of TableInputFormatBase#initializeTable(), there was
exception, table field might not have been initialized.

FYI

On Thu, Oct 8, 2015 at 7:54 PM, roywang1024  wrote:

>
> I have try this
>
> SparkConf sparkConf = new SparkConf().setAppName("HBaseIntoSpark");
> JavaSparkContext sc = new JavaSparkContext(sparkConf);
> Configuration conf = HBaseConfiguration.create();
> Scan scan = new Scan();
> scan.addFamily(Bytes.toBytes("InnerCode"));
> scan.addColumn(Bytes.toBytes("InnerCode"), Bytes.toBytes(""));
> conf.set(TableInputFormat.INPUT_TABLE, "SecuMain");
> conf.set(TableInputFormat.SCAN, convertScanToString(scan));
>
> and this
>
> SparkConf sparkConf = new SparkConf().setAppName("HBaseIntoSpark");
> JavaSparkContext sc = new JavaSparkContext(sparkConf);
> Configuration conf = HBaseConfiguration.create();
> String tableName = "SecuMain";
> conf.set(TableInputFormat.INPUT_TABLE, tableName);
>
> also can't wok!
>
> Should I add hbase-site.xml to conf?
>
> Thanks.
>
>
>
>
>
>
> At 2015-10-09 10:35:16, "Ted Yu"  wrote:
>
> One possibility was that hbase config, including hbase.zookeeper.quorum,
> was not passed to your job.
> hbase-site.xml should be on the classpath.
>
> Can you show snippet of your code ?
>
> Looks like you were running against hbase 1.x
>
> Cheers
>
> On Thu, Oct 8, 2015 at 7:29 PM, Roy Wang  wrote:
>
>>
>> I want to load hbase table into spark.
>> JavaPairRDD hBaseRDD =
>> sc.newAPIHadoopRDD(conf, TableInputFormat.class,
>> ImmutableBytesWritable.class, Result.class);
>>
>> *when call hBaseRDD.count(),got error.*
>>
>> Caused by: java.lang.IllegalStateException: The input format instance has
>> not been properly initialized. Ensure you call initializeTable either in
>> your constructor or initialize method
>> at
>>
>> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:389)
>> at
>>
>> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.createRecordReader(TableInputFormatBase.java:158)
>> ... 11 more
>>
>> *But when job start,I can get these logs*
>> 2015-10-09 09:17:00[main] WARN  TableInputFormatBase:447 - initializeTable
>> called multiple times. Overwriting connection and table reference;
>> TableInputFormatBase will not close these old references when done.
>>
>> Does anyone know how does this happen?
>>
>> Thanks!
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-load-hbase-on-spark-tp24986.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
>
>


Architecture for a Spark batch job.

2015-10-08 Thread Renato Perini
I have started a project using Spark 1.5.1 consisting of several jobs I 
launch (actually manually) using shell scripts against a small Spark 
standalone cluster.
Those jobs generally read a Cassandra table (using a RDD of type 
JavaRDD or using plain DataFrames), compute results on 
that data and write another Cassandra table with that results.
The project builds (using Apache Maven) a single shaded uber jar. This 
jar has many main methods. Each main method is launched against the 
cluster with a specific shell script (basically a spark-submit wrapper).


The number of jobs I'm writing is constantly increasing, the code base 
is growing in size and  is becoming a little bit disorganized. I'm 
facing some difficulties in logically organizing the code base, when
all I write are operations (trasformations and actions) on RDDs and 
DataFrames.


So my question is: how do you generally organize the code base for large 
projects? Can you give example, code snippets, architecture templates, 
etc. of the general workflow you use to create a new job?

Any help is really appreciated.

Thanks.

P.S.: I code in Java 7, we're not switching to Java 8 anytime soon and 
Scala is not an option at this time.




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Error in load hbase on spark

2015-10-08 Thread Roy Wang

I want to load hbase table into spark.
JavaPairRDD hBaseRDD =
sc.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

*when call hBaseRDD.count(),got error.*

Caused by: java.lang.IllegalStateException: The input format instance has
not been properly initialized. Ensure you call initializeTable either in
your constructor or initialize method
at
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:389)
at
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.createRecordReader(TableInputFormatBase.java:158)
... 11 more

*But when job start,I can get these logs*
2015-10-09 09:17:00[main] WARN  TableInputFormatBase:447 - initializeTable
called multiple times. Overwriting connection and table reference;
TableInputFormatBase will not close these old references when done.

Does anyone know how does this happen?

Thanks! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-load-hbase-on-spark-tp24986.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How can I read file from HDFS i sparkR from RStudio

2015-10-08 Thread Sun, Rui
Amit,

sqlContext <- sparkRSQL.init(sc)

peopleDF <- read.df(sqlContext, "hdfs://master:9000/sears/example.csv")

have you restarted the R session in RStudio between the two lines?

From: Amit Behera [mailto:amit.bd...@gmail.com]
Sent: Thursday, October 8, 2015 5:59 PM
To: user@spark.apache.org
Subject: How can I read file from HDFS i sparkR from RStudio

Hi All,
I am very new to SparkR.
I am able to run a sample code from example given in the link : 
http://www.r-bloggers.com/installing-and-starting-sparkr-locally-on-windows-os-and-rstudio/
Then I am trying to read a file from HDFS in RStudio, but unable to read.
Below is my code.

Sys.setenv(SPARK_HOME="/home/affine/spark")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))

library(SparkR)

library(rJava)

Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" 
"com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"')

sc <- sparkR.init(master = "spark://master:7077",sparkPackages = 
"com.databricks:spark-csv_2.1:1.2.0")

sqlContext <- sparkRSQL.init(sc)

peopleDF <- read.df(sqlContext, "hdfs://master:9000/sears/example.csv")
Error:


Error in callJMethod(sqlContext, "getConf", "spark.sql.sources.default",  :

  Invalid jobj 1. If SparkR was restarted, Spark operations need to be 
re-executed.
Please tell me where I am going wrong.
Thanks,
Amit.


Re: Error in load hbase on spark

2015-10-08 Thread Ted Yu
One possibility was that hbase config, including hbase.zookeeper.quorum,
was not passed to your job.
hbase-site.xml should be on the classpath.

Can you show snippet of your code ?

Looks like you were running against hbase 1.x

Cheers

On Thu, Oct 8, 2015 at 7:29 PM, Roy Wang  wrote:

>
> I want to load hbase table into spark.
> JavaPairRDD hBaseRDD =
> sc.newAPIHadoopRDD(conf, TableInputFormat.class,
> ImmutableBytesWritable.class, Result.class);
>
> *when call hBaseRDD.count(),got error.*
>
> Caused by: java.lang.IllegalStateException: The input format instance has
> not been properly initialized. Ensure you call initializeTable either in
> your constructor or initialize method
> at
>
> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:389)
> at
>
> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.createRecordReader(TableInputFormatBase.java:158)
> ... 11 more
>
> *But when job start,I can get these logs*
> 2015-10-09 09:17:00[main] WARN  TableInputFormatBase:447 - initializeTable
> called multiple times. Overwriting connection and table reference;
> TableInputFormatBase will not close these old references when done.
>
> Does anyone know how does this happen?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-load-hbase-on-spark-tp24986.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


weird issue with sqlContext.createDataFrame - pyspark 1.3.1

2015-10-08 Thread ping yan
I really cannot figure out what this is about..
(tried to import pandas, in case that is a dependency, but it didn't help.)

>>> from pyspark.sql import SQLContext
>>> sqlContext=SQLContext(sc)
>>> sqlContext.createDataFrame(l).collect()
Traceback (most recent call last):
  File "", line 1, in 
  File
"/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/python/pyspark/sql/context.py",
line 318, in createDataFrame
if has_pandas and isinstance(data, pandas.DataFrame):
AttributeError: 'module' object has no attribute 'DataFrame'

Would appreciate any pointers.

Thanks!
Ping


Insert via HiveContext is slow

2015-10-08 Thread Daniel Haviv
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement
passed via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are
ways to speed things up.
Would saving the DF like this
df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
be faster ?


Thank you.
Daniel


How to increase Spark partitions for the DataFrame?

2015-10-08 Thread unk1102
Hi I have the following code where I read ORC files from HDFS and it loads
directory which contains 12 ORC files. Now since HDFS directory contains 12
files it will create 12 partitions by default. These directory is huge and
when ORC files gets decompressed it becomes around 10 GB how do I increase
partitions for the below code so that my Spark job runs faster and does not
hang for long time because of reading 10 GB files through shuffle in 12
partitions. Please guide. 

DataFrame df =
hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
df.select().groupby(..)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



failed spark job reports on YARN as successful

2015-10-08 Thread Lan Jiang
Hi, there

I have a spark batch job running on CDH5.4 + Spark 1.3.0. Job is submitted in 
“yarn-client” mode. The job itself failed due to YARN kills several executor 
containers because the containers exceeded the memory limit posed by YARN. 
However, when I went to the YARN resource manager site, it displayed the job as 
successful. I found there was an issue reported in JIRA 
https://issues.apache.org/jira/browse/SPARK-3627 
, but it says it was fixed in 
Spark 1.2. On Spark history server, it shows the job as “Incomplete”. 

Is this still a bug or there is something I need to do in spark application to 
report the correct job status to YARN?

Lan



Re: How to increase Spark partitions for the DataFrame?

2015-10-08 Thread Lan Jiang
The partition number should be the same as the HDFS block number instead of 
file number. Did you confirmed from the spark UI that only 12 partitions were 
created? What is your ORC orc.stripe.size?

Lan


> On Oct 8, 2015, at 1:13 PM, unk1102  wrote:
> 
> Hi I have the following code where I read ORC files from HDFS and it loads
> directory which contains 12 ORC files. Now since HDFS directory contains 12
> files it will create 12 partitions by default. These directory is huge and
> when ORC files gets decompressed it becomes around 10 GB how do I increase
> partitions for the below code so that my Spark job runs faster and does not
> hang for long time because of reading 10 GB files through shuffle in 12
> partitions. Please guide. 
> 
> DataFrame df =
> hiveContext.read().format("orc").load("/hdfs/path/to/orc/files/");
> df.select().groupby(..)
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-increase-Spark-partitions-for-the-DataFrame-tp24980.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Saif.A.Ellafi
Repartition and default parallelism to 1, in cluster mode, is still broken.

So the problem is not the parallelism, but the cluster mode itself. Something 
wrong with HiveContext + cluster mode.

Saif

From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Thursday, October 08, 2015 3:01 PM
To: mich...@databricks.com
Cc: user@spark.apache.org
Subject: RE: RowNumber in HiveContext returns null or negative values

Hi, thanks for looking into. v1.5.1. I am really worried.
I dont have hive/hadoop for real in the environment.

Saif

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Thursday, October 08, 2015 2:57 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: RowNumber in HiveContext returns null or negative values

Which version of Spark?

On Thu, Oct 8, 2015 at 7:25 AM, 
> wrote:
Hi all, would this be a bug??

val ws = Window.
partitionBy("clrty_id").
orderBy("filemonth_dtt")

val nm = "repeatMe"
df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))


stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))

--->

Long, DateType, Int
[2003,2006-06-01,-1863462909]
[2003,2006-09-01,-1863462909]
[2003,2007-01-01,-1863462909]
[2003,2007-08-01,-1863462909]
[2003,2007-07-01,-1863462909]
[2138,2007-07-01,-1863462774]
[2138,2007-02-01,-1863462774]
[2138,2006-11-01,-1863462774]
[2138,2006-08-01,-1863462774]
[2138,2007-08-01,-1863462774]
[2138,2006-09-01,-1863462774]
[2138,2007-03-01,-1863462774]
[2138,2006-10-01,-1863462774]
[2138,2007-05-01,-1863462774]
[2138,2006-06-01,-1863462774]
[2138,2006-12-01,-1863462774]


Thanks,
Saif




Why dataframe.persist(StorageLevels.MEMORY_AND_DISK_SER) hangs for long time?

2015-10-08 Thread unk1102
Hi as recommended I am caching my Spark job dataframe as
dataframe.persist(StorageLevels.MEMORY_AND_DISK_SER) but what I see in Spark
job UI is this persist stage runs for so long showing 10 GB of shuffle read
and 5 GB of shuffle write it takes to long to finish and because of that
sometimes my Spark job throws timeout or throws OOM and hence executors gets
killed by YARN. I am using Spark 1.4.1. I am using all sort of optimizations
like Tungsten, Kryo I have given storage.memoryFraction as 0.2 and
storage.shuffle as 0.2 also. My data is huge around 1 TB I am using default
200 partitions for spark.sql.shuffle.partitions. Please help me I am
clueless please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-dataframe-persist-StorageLevels-MEMORY-AND-DISK-SER-hangs-for-long-time-tp24981.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RowNumber in HiveContext returns null or negative values

2015-10-08 Thread Michael Armbrust
Can you open a JIRA?

On Thu, Oct 8, 2015 at 11:24 AM,  wrote:

> Repartition and default parallelism to 1, in cluster mode, is still
> *broken*.
>
>
>
> So the problem is not the parallelism, but the cluster mode itself.
> Something wrong with HiveContext + cluster mode.
>
>
>
> Saif
>
>
>
> *From:* saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
>
> *Sent:* Thursday, October 08, 2015 3:01 PM
> *To:* mich...@databricks.com
> *Cc:* user@spark.apache.org
> *Subject:* RE: RowNumber in HiveContext returns null or negative values
>
>
>
> Hi, thanks for looking into. v1.5.1. I am really worried.
>
> I dont have hive/hadoop for real in the environment.
>
>
>
> Saif
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com
> ]
> *Sent:* Thursday, October 08, 2015 2:57 PM
> *To:* Ellafi, Saif A.
> *Cc:* user
> *Subject:* Re: RowNumber in HiveContext returns null or negative values
>
>
>
> Which version of Spark?
>
>
>
> On Thu, Oct 8, 2015 at 7:25 AM,  wrote:
>
> Hi all, would this be a bug??
>
>
>
> val ws = Window.
>
> partitionBy("clrty_id").
>
> orderBy("filemonth_dtt")
>
>
>
> val nm = "repeatMe"
>
> df.select(df.col("*"), rowNumber().over(ws).cast("int").as(nm))
>
>
>
>
> stacked_data.filter(stacked_data("repeatMe").isNotNull).orderBy("repeatMe").take(50).foreach(println(_))
>
>
>
> --->
>
>
>
> *Long, DateType, Int*
>
> [2003,2006-06-01,-1863462909]
>
> [2003,2006-09-01,-1863462909]
>
> [2003,2007-01-01,-1863462909]
>
> [2003,2007-08-01,-1863462909]
>
> [2003,2007-07-01,-1863462909]
>
> [2138,2007-07-01,-1863462774]
>
> [2138,2007-02-01,-1863462774]
>
> [2138,2006-11-01,-1863462774]
>
> [2138,2006-08-01,-1863462774]
>
> [2138,2007-08-01,-1863462774]
>
> [2138,2006-09-01,-1863462774]
>
> [2138,2007-03-01,-1863462774]
>
> [2138,2006-10-01,-1863462774]
>
> [2138,2007-05-01,-1863462774]
>
> [2138,2006-06-01,-1863462774]
>
> [2138,2006-12-01,-1863462774]
>
>
>
>
>
> Thanks,
>
> Saif
>
>
>
>
>


Re: Spark REST Job server feedback?

2015-10-08 Thread Tim Smith
I am curious too - any comparison between the two. Looks like one is
Datastax sponsored and the other is Cloudera. Other than that, any
major/core differences in design/approach?

Thanks,

Tim


On Mon, Sep 28, 2015 at 8:32 AM, Ramirez Quetzal  wrote:

> Anyone has feedback on using Hue / Spark Job Server REST servers?
>
>
> http://gethue.com/how-to-use-the-livy-spark-rest-job-server-for-interactive-spark/
>
> https://github.com/spark-jobserver/spark-jobserver
>
> Many thanks,
>
> Rami
>



-- 

--
Thanks,

Tim


Streaming DirectKafka assertion errors

2015-10-08 Thread Roman Garcia
I'm running Spark Streaming using Kafka Direct stream, expecting
exactly-once semantics using checkpoints (which are stored onto HDFS).
My Job is really simple, it opens 6 Kafka streams (6 topics with 4 parts
each) and stores every row to ElasticSearch using ES-Spark integration.

This job was working without issues on a different environment, but on this
new environment, I've started to see these assertion errors:

"Job aborted due to stage failure: Task 7 in stage 79.0 failed 4 times,
most recent failure: Lost task 7.3 in stage 79.0 (TID 762, 192.168.18.219):
java.lang.AssertionError: assertion failed: Beginning offset 20 is after
the ending offset 14 for topic some-topic partition 1. You either provided
an invalid fromOffset, or the Kafka topic has been damaged"

and also

"Job aborted due to stage failure: Task 12 in stage 83.0 failed 4 times,
most recent failure: Lost task 12.3 in stage 83.0 (TID 815,
192.168.18.219): java.lang.AssertionError: assertion failed: Ran out of
messages before reaching ending offset 20 for topic some-topic partition 1
start 14. This should not happen, and indicates that messages may have been
lost"

When querying my Kafka cluster (3 brokers, 7 topics, 4 parts each topic, 3
zookeeper nodes, no other consumers), I see what appaears to differ from
Spark offset info:

*running kafka.tools.GetOffsetShell --time -1*
some-topic:0:20
some-topic:1:20
some-topic:2:19
some-topic:3:20
*running kafka.tools.GetOffsetShell --time -2*
some-topic:0:0
some-topic:1:0
some-topic:2:0
some-topic:3:0

*running kafka-simple-consumer-shell* I can see all stored messages until
offset 20, with a final output: "Terminating. Reached the end of partition
(some-topic, 1) at offset 20"

I tried removing the whole checkpoint dir and start over, but it keeps
failing.

It looks like these tasks get retried without end. On the spark-ui
streaming tab I see the "Active batches" increase with a confusing "Input
size" value of "-19" (negative size?)

Any pointers will help
Thanks

Roman


ValueError: can not serialize object larger than 2G

2015-10-08 Thread XIANDI
  File "/home/hadoop/spark/python/pyspark/worker.py", line 101, in main
process()
  File "/home/hadoop/spark/python/pyspark/worker.py", line 96, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/hadoop/spark/python/pyspark/serializers.py", line 126, in
dump_stream
self._write_with_length(obj, stream)
  File "/home/hadoop/spark/python/pyspark/serializers.py", line 140, in
_write_with_length
raise ValueError("can not serialize object larger than 2G")
ValueError: can not serialize object larger than 2G

Does anyone know how does this happen?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ValueError-can-not-serialize-object-larger-than-2G-tp24984.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org