Re: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore

2015-05-24 Thread Mark Hamstra
This discussion belongs on the dev list.  Please post any replies there.

On Sat, May 23, 2015 at 10:19 PM, Cheolsoo Park piaozhe...@gmail.com
wrote:

 Hi,

 I've been testing SparkSQL in 1.4 rc and found two issues. I wanted to
 confirm whether these are bugs or not before opening a jira.

 *1)* I can no longer compile SparkSQL with -Phive-0.12.0. I noticed that
 in 1.4, IsolatedClientLoader is introduced, and different versions of Hive
 metastore jars can be loaded at runtime. But instead, SparkSQL no longer
 compiles with Hive 0.12.0.

 My question is, is this intended? If so, shouldn't the hive-0.12.0 profile
 in POM be removed?

 *2)* After compiling SparkSQL with -Phive-0.13.1, I ran into my 2nd
 problem. Since I have Hive 0.12 metastore in production, I have to use it
 for now. But even if I set spark.sql.hive.metastore.version and
 spark.sql.hive.metastore.jars, SparkSQL cli throws an error as follows-

 15/05/24 05:03:29 WARN RetryingMetaStoreClient: MetaStoreClient lost
 connection. Attempting to reconnect.
 org.apache.thrift.TApplicationException: Invalid method name:
 'get_functions'
 at
 org.apache.thrift.TApplicationException.read(TApplicationException.java:108)
 at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
 at
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_functions(ThriftHiveMetastore.java:2886)
 at
 org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_functions(ThriftHiveMetastore.java:2872)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getFunctions(HiveMetaStoreClient.java:1727)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
 at com.sun.proxy.$Proxy12.getFunctions(Unknown Source)
 at org.apache.hadoop.hive.ql.metadata.Hive.getFunctions(Hive.java:2670)
 at
 org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:674)
 at
 org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:662)
 at
 org.apache.hadoop.hive.cli.CliDriver.getCommandCompletor(CliDriver.java:540)
 at
 org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:175)
 at
 org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)

 What's happening is that when SparkSQL Cli starts up, it tries to fetch
 permanent udfs from Hive metastore (due to HIVE-6330
 https://issues.apache.org/jira/browse/HIVE-6330, which was introduced
 in Hive 0.13). But then, it ends up invoking an incompatible thrift
 function that doesn't exist in Hive 0.12. To work around this error, I have
 to comment out the following line of code for now-
 https://goo.gl/wcfnH1

 My question is, is SparkSQL that is compiled against Hive 0.13 supposed to
 work with Hive 0.12 metastore (by setting
 spark.sql.hive.metastore.version and spark.sql.hive.metastore.jars)? It
 only works if I comment out the above line of code.

 Thanks,
 Cheolsoo



how to distributed run a bash shell in spark

2015-05-24 Thread luohui20001
hello there  I am trying to run a app in which part of it needs to run a 
shell.how to run a shell distributed in spark cluster.thanks.

here's my code:import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class ShellCompare {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new 
SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory,
 6g);
JavaSparkContext sc = new JavaSparkContext(conf);

for(int i=1;i=21;i++){
execShell(i);
}
//execShell(1);
sc.stop();
}

private static void execShell(int i) {
String shpath=/opt/sh/bin/sort.sh;
Process process =null;

String var=/opt/data/shellcompare/chr + i +.txt 
/opt/data/shellcompare/samplechr + i +.txt /opt/data/shellcompare/result.txt 
600;   
//String var=/opt/data/chr1.txt /opt/data/chr1sample.txt 
/opt/sh/bin/result.txt 600;  
String command2 = sh  + shpath +   + var;
try {
process = Runtime.getRuntime().exec(command2);
process.waitFor();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}



 

Thanksamp;Best regards!
San.Luo


Spark dramatically slow when I add saveAsTextFile

2015-05-24 Thread allanjie
*Problem Description*:

The program running in  stand-alone spark cluster (1 master, 6 workers with
8g ram and 2 cores).
Input: a 468MB file with 133433 records stored in HDFS.
Output: just 2MB file will stored in HDFS
The program has two map operations and one reduceByKey operation.
Finally I save the result to HDFS using *saveAsTextFile*.
*Problem*: if I don't add saveAsTextFile, the program runs very fast(a few
seconds), otherwise extremely slow until about 30 mins.

*My program (is very Simple)*
public static void main(String[] args) throws IOException{
/**Parameter Setting***/
 String localPointPath = 
/home/hduser/skyrock/skyrockImageFeatures.csv;
 String remoteFilePath =
hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
 String outputPath = 
hdfs://HadoopV26Master:9000/user/sparkoutput/;
 final int row = 133433;
 final int col = 458;
 final double dc = Double.valueOf(args[0]);

SparkConf conf = new SparkConf().
setAppName(distance)
.set(spark.executor.memory, 
4g).set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.eventLog.enabled, true);
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDDString textFile = sc.textFile(remoteFilePath);

//Broadcast variable, the dimension of this double array: 
133433*458
final Broadcastdouble[][] broadcastPoints =
sc.broadcast(createBroadcastPoints(localPointPath,row,col));
/**
 * Compute the distance in terms of each point on each instance.
 * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
 */
JavaPairRDDInteger,Double distance = 
textFile.flatMapToPair(new
PairFlatMapFunctionString, Integer, Double(){
public IterableTuple2lt;Integer, Double call(String 
v1) throws
Exception{
ListString al = Arrays.asList(v1.split(,)); 
double[] featureVals = new double[al.size()];
for(int j=0;jal.size()-1;j++)
featureVals[j] = 
Double.valueOf(al.get(j+1));
int jIndex = Integer.valueOf(al.get(0));
double[][] allPoints =  broadcastPoints.value();
double sum = 0;
Listlt;Tuple2lt;Integer, Double list = new
ArrayListTuple2lt;Integer, Double();
for(int i=0;irow; i++){
sum = 0;
for(int j=0;jlt;al.size()-1;j++){
sum +=
(allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
}
list.add(new 
Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) ));
}
return list;
}
});

//Create zeroOne density
JavaPairRDDInteger, Integer densityZeroOne = 
distance.mapValues(new
FunctionDouble, Integer(){
public Integer call(Double v1) throws Exception {
if(v1dc)
return 1;
else return 0;
}

});
//  //Combine the density
JavaPairRDDlt;Integer, Integer counts = 
densityZeroOne.reduceByKey(new
Function2Integer, Integer,Integer() {
public Integer call(Integer v1, Integer v2) 
throws Exception {
return v1+v2;
}
});
counts.*saveAsTextFile*(outputPath+args[1]);
sc.stop();
}

*If I comment saveAsTextFile, log will be:*
Picked up _JAVA_OPTIONS: -Xmx4g
15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser
15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to:
hduser
15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hduser); users with modify permissions: Set(hduser)
15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/24 15:21:31 INFO Remoting: 

Re: Strange ClassNotFound exeption

2015-05-24 Thread Ted Yu
Can you pastebin the class path ?

Thanks



 On May 24, 2015, at 5:02 AM, boci boci.b...@gmail.com wrote:
 
 Yeah, I have same jar with same result, I run in docker container and I using 
 same docker container with my another project... the only difference is the 
 postgresql jdbc driver and the custom RDD... no additional dependencies (both 
 single jar generated with same assembly configuration with same dependencies) 
 and the second is work like a charm
 
 Another idea?
 
 2015. máj. 24. 2:41 ezt írta (Ted Yu yuzhih...@gmail.com):
 In my local maven repo, I found:
 
 $ jar tvf 
 /Users/tyu/.m2/repository//org/spark-project/akka/akka-actor_2.10/2.3.4-spark/akka-actor_2.10-2.3.4-spark.jar
  | grep SelectionPath
521 Mon Sep 29 12:05:36 PDT 2014 akka/actor/SelectionPathElement.class
 
 Is the above jar in your classpath ?
 
 On Sat, May 23, 2015 at 5:05 PM, boci boci.b...@gmail.com wrote:
 Hi guys!
 
 I have a small spark application. It's query some data from postgres, 
 enrich it and write to elasticsearch. When I deployed into spark container 
 I got a very fustrating error:
 https://gist.github.com/b0c1/66527e00bada1e4c0dc3
 
 Spark version: 1.3.1
 Hadoop version: 2.6.0
 Additional info: 
   serialization: kryo
   rdd: custom rdd to query
 
 I not understand
 1. akka.actor.SelectionPath doesn't exists in 1.3.1
 2. I checked all dependencies in my project, I only have 
 org.spark-project.akka:akka-*_2.10:2.3.4-spark:jar doesn't have
 3. I not found any reference for this...
 4. I created own RDD, it's work, but I need to register to kryo? (mapRow 
 using ResultSet, I need to create 
 5. I used some months ago and it's already worked with spark 1.2... I 
 recompiled with 1.3.1 but I got this strange error
 
 Any idea?
 
 --
 Skype: boci13, Hangout: boci.b...@gmail.com


Re: Strange ClassNotFound exeption

2015-05-24 Thread boci
Yeah, I have same jar with same result, I run in docker container and I
using same docker container with my another project... the only difference
is the postgresql jdbc driver and the custom RDD... no additional
dependencies (both single jar generated with same assembly configuration
with same dependencies) and the second is work like a charm

Another idea?
2015. máj. 24. 2:41 ezt írta (Ted Yu yuzhih...@gmail.com):

 In my local maven repo, I found:

 $ jar tvf
 /Users/tyu/.m2/repository//org/spark-project/akka/akka-actor_2.10/2.3.4-spark/akka-actor_2.10-2.3.4-spark.jar
 | grep SelectionPath
521 Mon Sep 29 12:05:36 PDT 2014 akka/actor/SelectionPathElement.class

 Is the above jar in your classpath ?

 On Sat, May 23, 2015 at 5:05 PM, boci boci.b...@gmail.com wrote:

 Hi guys!

 I have a small spark application. It's query some data from postgres,
 enrich it and write to elasticsearch. When I deployed into spark container
 I got a very fustrating error:
 https://gist.github.com/b0c1/66527e00bada1e4c0dc3

 Spark version: 1.3.1
 Hadoop version: 2.6.0
 Additional info:
   serialization: kryo
   rdd: custom rdd to query

 I not understand
 1. akka.actor.SelectionPath doesn't exists in 1.3.1
 2. I checked all dependencies in my project, I only have
 org.spark-project.akka:akka-*_2.10:2.3.4-spark:jar doesn't have
 3. I not found any reference for this...
 4. I created own RDD, it's work, but I need to register to kryo? (mapRow
 using ResultSet, I need to create
 5. I used some months ago and it's already worked with spark 1.2... I
 recompiled with 1.3.1 but I got this strange error

 Any idea?


 --
 Skype: boci13, Hangout: boci.b...@gmail.com





Re: Spark Streaming - Design considerations/Knobs

2015-05-24 Thread Maiti, Samya
Really good list to brush up basics.

Just one input, regarding

  *   An RDD's processing is scheduled by driver's jobscheduler as a job. At a 
given point of time only one job is active. So, if one job is executing the 
other jobs are queued.

We can have multiple jobs running in a given application at a point of time, if 
they are submitted from different threads. So essentially in a single threaded 
application, the above statement holds true.

Regards,
Sam

On May 24, 2015, at 1:14 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:

Blocks are replicated immediately, before the driver launches any jobs using 
them.

On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat 
hemant9...@gmail.commailto:hemant9...@gmail.com wrote:
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for 
reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the blocks 
replicated immediately after they are received by the receiver? Or are they 
kept on the receiver node only and are moved only on shuffle? Has the 
replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Correcting the ones that are incorrect or incomplete. BUT this is good list for 
things to remember about Spark Streaming.


On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat 
hemant9...@gmail.commailto:hemant9...@gmail.com wrote:
Hi,

I have compiled a list (from online sources) of knobs/design considerations 
that need to be taken care of by applications running on spark streaming. Is my 
understanding correct?  Any other important design consideration that I should 
take care of?


  *   A DStream is associated with a single receiver. For attaining read 
parallelism multiple receivers i.e. multiple DStreams need to be created.
  *   A receiver is run within an executor. It occupies one core. Ensure that 
there are enough cores for processing after receiver slots are booked i.e. 
spark.cores.max should take the receiver slots into account.
  *   The receivers are allocated to executors in a round robin fashion.
  *   When data is received from a stream source, receiver creates blocks of 
data.  A new block of data is generated every blockInterval milliseconds. N 
blocks of data are created during the batchInterval where N = 
batchInterval/blockInterval.
  *   These blocks are distributed by the BlockManager of the current executor 
to the block managers of other executors. After that, the Network Input Tracker 
running on the driver is informed about the block locations for further 
processing.
  *   A RDD is created on the driver for the blocks created during the 
batchInterval. The blocks generated during the batchInterval are partitions of 
the RDD. Each partition is a task in spark. blockInterval== batchinterval would 
mean that a single partition is created and probably it is processed locally.

The map tasks on the blocks are processed in the executors (one that received 
the block, and another where the block was replicated) that has the blocks 
irrespective of block interval, unless non-local scheduling kicks in (as you 
observed next).

  *   Having bigger blockinterval means bigger blocks. A high value of 
spark.locality.wait increases the chance of processing a block on the local 
node. A balance needs to be found out between these two parameters to ensure 
that the bigger blocks are processed locally.
  *   Instead of relying on batchInterval and blockInterval, you can define the 
number of partitions by calling dstream.repartition(n). This reshuffles the 
data in RDD randomly to create n number of partitions.

Yes, for greater parallelism. Though comes at the cost of a shuffle.

  *   An RDD's processing is scheduled by driver's jobscheduler as a job. At a 
given point of time only one job is active. So, if one job is executing the 
other jobs are queued.

  *   If you have two dstreams there will be two RDDs formed and there will be 
two jobs created which will be scheduled one after the another.

  *   To avoid this, you can union two dstreams. This will ensure that a single 
unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then 
considered as a single job. However the partitioning of the RDDs is not 
impacted.

To further clarify, the jobs depend on the number of output operations (print, 
foreachRDD, saveAsXFiles) and the number of RDD actions in those output 
operations.

dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job 
per batch

dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }
// TWO Spark jobs per batch

dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = 
rdd.count }  // TWO Spark jobs per batch



  *
  *   If the batch processing time is more than batchinterval then obviously 
the receiver's memory will start filling 

Re: Spark dramatically slow when I add saveAsTextFile

2015-05-24 Thread Joe Wass
This may sound like an obvious question, but are you sure that the program
is doing any work when you don't have a saveAsTextFile? If there are
transformations but no actions to actually collect the data, there's no
need for Spark to execute the transformations.

As to the question of 'is this taking too long', I can't answer that. But
your code was HTML escaped and therefore difficult to read, perhaps you
should post a link to a Gist.

Joe

On 24 May 2015 at 10:36, allanjie allanmcgr...@gmail.com wrote:

 *Problem Description*:

 The program running in  stand-alone spark cluster (1 master, 6 workers with
 8g ram and 2 cores).
 Input: a 468MB file with 133433 records stored in HDFS.
 Output: just 2MB file will stored in HDFS
 The program has two map operations and one reduceByKey operation.
 Finally I save the result to HDFS using *saveAsTextFile*.
 *Problem*: if I don't add saveAsTextFile, the program runs very fast(a
 few
 seconds), otherwise extremely slow until about 30 mins.

 *My program (is very Simple)*
 public static void main(String[] args) throws IOException{
 /**Parameter Setting***/
  String localPointPath =
 /home/hduser/skyrock/skyrockImageFeatures.csv;
  String remoteFilePath =
 hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv;
  String outputPath =
 hdfs://HadoopV26Master:9000/user/sparkoutput/;
  final int row = 133433;
  final int col = 458;
  final double dc = Double.valueOf(args[0]);

 SparkConf conf = new SparkConf().
 setAppName(distance)
 .set(spark.executor.memory,
 4g).set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.eventLog.enabled, true);
 JavaSparkContext sc = new JavaSparkContext(conf);

 JavaRDDString textFile = sc.textFile(remoteFilePath);

 //Broadcast variable, the dimension of this double array:
 133433*458
 final Broadcastdouble[][] broadcastPoints =
 sc.broadcast(createBroadcastPoints(localPointPath,row,col));
 /**
  * Compute the distance in terms of each point on each
 instance.
  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
  */
 JavaPairRDDInteger,Double distance =
 textFile.flatMapToPair(new
 PairFlatMapFunctionString, Integer, Double(){
 public IterableTuple2lt;Integer, Double
 call(String v1) throws
 Exception{
 ListString al =
 Arrays.asList(v1.split(,));
 double[] featureVals = new
 double[al.size()];
 for(int j=0;jal.size()-1;j++)
 featureVals[j] =
 Double.valueOf(al.get(j+1));
 int jIndex = Integer.valueOf(al.get(0));
 double[][] allPoints =
 broadcastPoints.value();
 double sum = 0;
 Listlt;Tuple2lt;Integer, Double list =
 new
 ArrayListTuple2lt;Integer, Double();
 for(int i=0;irow; i++){
 sum = 0;
 for(int j=0;jlt;al.size()-1;j++){
 sum +=
 (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
 }
 list.add(new
 Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) ));
 }
 return list;
 }
 });

 //Create zeroOne density
 JavaPairRDDInteger, Integer densityZeroOne =
 distance.mapValues(new
 FunctionDouble, Integer(){
 public Integer call(Double v1) throws Exception {
 if(v1dc)
 return 1;
 else return 0;
 }

 });
 //  //Combine the density
 JavaPairRDDlt;Integer, Integer counts =
 densityZeroOne.reduceByKey(new
 Function2Integer, Integer,Integer() {
 public Integer call(Integer v1, Integer
 v2) throws Exception {
 return v1+v2;
 }
 });
 counts.*saveAsTextFile*(outputPath+args[1]);
 sc.stop();
 }

 *If I comment saveAsTextFile, log will be:*
 Picked up _JAVA_OPTIONS: -Xmx4g
 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load 

Re: how to distributed run a bash shell in spark

2015-05-24 Thread Akhil Das
You mean you want to execute some shell commands from spark? Here's
something i tried a while back. https://github.com/akhld/spark-exploit

Thanks
Best Regards

On Sun, May 24, 2015 at 4:53 PM, luohui20...@sina.com wrote:

 hello there

   I am trying to run a app in which part of it needs to run a
 shell.how to run a shell distributed in spark cluster.thanks.


 here's my code:

 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;

 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;

 public class ShellCompare {
 public static void main(String[] args) {
 // TODO Auto-generated method stub
 SparkConf conf = new
 SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory,
 6g);
 JavaSparkContext sc = new JavaSparkContext(conf);

 for(int i=1;i=21;i++){
 execShell(i);
 }
 //execShell(1);
 sc.stop();
 }

 private static void execShell(int i) {
 String shpath=/opt/sh/bin/sort.sh;
 Process process =null;

 String var=/opt/data/shellcompare/chr + i +.txt
 /opt/data/shellcompare/samplechr + i +.txt
 /opt/data/shellcompare/result.txt 600;
 //String var=/opt/data/chr1.txt /opt/data/chr1sample.txt
 /opt/sh/bin/result.txt 600;
 String command2 = sh  + shpath +   + var;
 try {
 process = Runtime.getRuntime().exec(command2);
 process.waitFor();
 } catch (InterruptedException e1) {
 // TODO Auto-generated catch block
 e1.printStackTrace();
 } catch (IOException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 }
 }

 }



 

 Thanksamp;Best regards!
 San.Luo



Re: Trying to connect to many topics with several DirectConnect

2015-05-24 Thread Akhil Das
I used to hit a NPE when i don't add all the dependency jars to my context
while running it in standalone mode. Can you try adding all these
dependencies to your context?


sc.addJar(/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.1.jar)
   
sc.addJar(/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.1.1.jar)
   
sc.addJar(/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar)
   
sc.addJar(/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar)


Thanks
Best Regards

On Fri, May 22, 2015 at 5:20 PM, Guillermo Ortiz konstt2...@gmail.com
wrote:

 Hi,

 I'm trying to connect to two topics of Kafka with Spark with DirectStream
 but I get an error. I don't know if there're any limitation to do it,
 because when I just access to one topics everything if right.

 *val ssc = new StreamingContext(sparkConf, Seconds(5))*
 *val kafkaParams = Map[String, String](metadata.broker.list -
 quickstart.cloudera:9092)*
 *val setTopic1 = Set(topic1)*
 *val setTopic2 = Set(topic2)*

 *val stream1 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
 *val stream2 = KafkaUtils.createDirectStream[String, String,
 StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*


 The error that I get is:
 * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
 *15/05/22 13:12:40 ERROR OneForOneStrategy: *
 *java.lang.NullPointerException*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
 * at
 scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
 * at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
 * at
 scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
 * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*


 Are there any limitation to do it?



Help optimizing some spark code

2015-05-24 Thread Tal
Hi, 
I'm running this piece of code in my program:

smallRdd.join(largeRdd) 
  .groupBy { case (id, (_, X(a, _, _))) = a }
  .map { case (a, iterable) = a- iterable.size }
  .sortBy({ case (_, count) = count }, ascending = false)
  .take(k)

where basically
smallRdd is an rdd of (Long, Unit) and it has thousands of entries in it,
(it was an rdd of longs but i needed a tuple for the join)
largeRdd is an rdd of (Long, X) where X is a case class containing 3 Longs,
and it has millions of entries in it.
/both rdds have already been sorted by key/

what i want is, out of the intersection between the rdds (by key which is
the first Long in the tuple) find the top k which have the most appearances
of the same first value in X (a in this example).

This code works but takes way too long, it can take up to 10 minutes on rdds
of sizes 20,000 and 8,000,000, i've been playing around with commenting out
different lines in the process and running it and i can't seem to find a
clear bottleneck, each line seems to be quite costly.

I am wondering if anyone can think of a better way to do this, 
* especially wondering if i should use IndexedRDD for the join, would it
significantly improve the join performance ?
* i really don't like the fact that i'm sorting thousands of entries just to
get the top k, where usually k  smallRdd.count, is there some kind of
select kth i can do (and then just filter the bigger elements) on rdds ? or
any other way to improve what's happening here ?

thanks!







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-optimizing-some-spark-code-tp23006.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Powered by Spark listing

2015-05-24 Thread Michael Roberts
Information Innovators, Inc.
http://www.iiinfo.com/
Spark, Spark Streaming, Spark SQL, MLLib
Developing data analytics systems for federal healthcare, national defense
and other programs using Spark on YARN.

--
This page tracks the users of Spark. To add yourself to the list, please
email user@spark.apache.org with your organization name, URL, a list of
which Spark components you are using, and a short description of your use
case.


Re: How to use zookeeper in Spark Streaming

2015-05-24 Thread Ted Yu
I think the Zookeeper watcher code should reside in task code.

Haven't found guide on this subject so far.

Cheers

On Sun, May 24, 2015 at 7:15 PM, bit1...@163.com bit1...@163.com wrote:

 Can someone please help me on this?

 --
 bit1...@163.com


 *发件人:* bit1...@163.com
 *发送时间:* 2015-05-24 13:53
 *收件人:* user user@spark.apache.org
 *主题:* How to use zookeeper in Spark Streaming

 Hi,
 In my spark streaming application, when the application starts and get
 running, the Tasks running on the Worker nodes need to be notified that
 some configurations have been changed from time to time, these
 configurations reside on the Zookeeper.

 My question is, where should I put the code that works with Zookeeper for
 the configuration change, in the Driver code or in the Task code? Is there
 some guide on this? Thanks.





 --
 bit1...@163.com




回复:Re: how to distributed run a bash shell in spark

2015-05-24 Thread luohui20001
Thanks Akhil, 
   your code is a big help to me,'cause perl script is the exactly 
thing i wanna try to run in spark. I will have a try.




 

Thanksamp;Best regards!
San.Luo

- 原始邮件 -
发件人:Akhil Das ak...@sigmoidanalytics.com
收件人:罗辉 luohui20...@sina.com
抄送人:user user@spark.apache.org
主题:Re: how to distributed run a bash shell in spark
日期:2015年05月25日 00点53分

You mean you want to execute some shell commands from spark? Here's something i 
tried a while back. https://github.com/akhld/spark-exploitThanksBest Regards

On Sun, May 24, 2015 at 4:53 PM,  luohui20...@sina.com wrote:
hello there  I am trying to run a app in which part of it needs to run a 
shell.how to run a shell distributed in spark cluster.thanks.

here's my code:import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class ShellCompare {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new 
SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory,
 6g);
JavaSparkContext sc = new JavaSparkContext(conf);

for(int i=1;i=21;i++){
execShell(i);
}
//execShell(1);
sc.stop();
}

private static void execShell(int i) {
String shpath=/opt/sh/bin/sort.sh;
Process process =null;

String var=/opt/data/shellcompare/chr + i +.txt 
/opt/data/shellcompare/samplechr + i +.txt /opt/data/shellcompare/result.txt 
600;   
//String var=/opt/data/chr1.txt /opt/data/chr1sample.txt 
/opt/sh/bin/result.txt 600;  
String command2 = sh  + shpath +   + var;
try {
process = Runtime.getRuntime().exec(command2);
process.waitFor();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}



 

Thanksamp;Best regards!
San.Luo





Using Spark like a search engine

2015-05-24 Thread Сергей Мелехин
HI!
We are developing scoring system for recruitment. Recruiter enters vacancy
requirements, and we score tens of thousands of CVs to this requirements,
and return e.g. top 10 matches.
We do not use fulltext search and sometimes even dont filter input CVs
prior to scoring (some vacancies do not have mandatory requirements that
can be used as a filter effectively).

So we have scoring function F(CV,VACANCY) that is currently inplemented in
SQL and runs on Postgresql cluster. In worst case F is executed once on
every CV in database. VACANCY part is fixed for one query, but changes
between queries and there's very little we can process in advance.

We expect to have about 100 000 000 CVs in next year, and do not expect our
current implementation to offer desired low latency responce (1 s) on 100M
CVs. So we look for a horizontaly scaleable and fault-tolerant in-memory
solution.

Will Spark be usefull for our task? All tutorials I could find describe
stream processing, or ML applications. What Spark extensions/backends can
be useful?


With best regards, Segey Melekhin


回复: How to use zookeeper in Spark Streaming

2015-05-24 Thread bit1...@163.com
Can someone please help me on this? 



bit1...@163.com
 
发件人: bit1...@163.com
发送时间: 2015-05-24 13:53
收件人: user
主题: How to use zookeeper in Spark Streaming

Hi,
In my spark streaming application, when the application starts and get running, 
the Tasks running on the Worker nodes need to be notified that some 
configurations have been changed from time to time, these configurations reside 
on the Zookeeper.

My question is, where should I put the code that works with Zookeeper for the 
configuration change, in the Driver code or in the Task code? Is there some 
guide on this? Thanks.







bit1...@163.com


RE: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore

2015-05-24 Thread Cheng, Hao
Thanks for reporting this.

We intend to support the multiple metastore versions in a single 
build(hive-0.13.1) by introducing the IsolatedClientLoader, but probably you’re 
hitting the bug, please file a jira issue for this.

I will keep investigating on this also.

Hao


From: Mark Hamstra [mailto:m...@clearstorydata.com]
Sent: Sunday, May 24, 2015 9:06 PM
To: Cheolsoo Park
Cc: user@spark.apache.org; d...@spark.apache.org
Subject: Re: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore

This discussion belongs on the dev list.  Please post any replies there.

On Sat, May 23, 2015 at 10:19 PM, Cheolsoo Park 
piaozhe...@gmail.commailto:piaozhe...@gmail.com wrote:
Hi,

I've been testing SparkSQL in 1.4 rc and found two issues. I wanted to confirm 
whether these are bugs or not before opening a jira.

1) I can no longer compile SparkSQL with -Phive-0.12.0. I noticed that in 1.4, 
IsolatedClientLoader is introduced, and different versions of Hive metastore 
jars can be loaded at runtime. But instead, SparkSQL no longer compiles with 
Hive 0.12.0.

My question is, is this intended? If so, shouldn't the hive-0.12.0 profile in 
POM be removed?

2) After compiling SparkSQL with -Phive-0.13.1, I ran into my 2nd problem. 
Since I have Hive 0.12 metastore in production, I have to use it for now. But 
even if I set spark.sql.hive.metastore.version and 
spark.sql.hive.metastore.jars, SparkSQL cli throws an error as follows-

15/05/24 05:03:29 WARN RetryingMetaStoreClient: MetaStoreClient lost 
connection. Attempting to reconnect.
org.apache.thrift.TApplicationException: Invalid method name: 'get_functions'
at org.apache.thrift.TApplicationException.read(TApplicationException.java:108)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_functions(ThriftHiveMetastore.java:2886)
at 
org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_functions(ThriftHiveMetastore.java:2872)
at 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getFunctions(HiveMetaStoreClient.java:1727)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy12.getFunctions(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.getFunctions(Hive.java:2670)
at 
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:674)
at 
org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:662)
at org.apache.hadoop.hive.cli.CliDriver.getCommandCompletor(CliDriver.java:540)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:175)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)

What's happening is that when SparkSQL Cli starts up, it tries to fetch 
permanent udfs from Hive metastore (due to 
HIVE-6330https://issues.apache.org/jira/browse/HIVE-6330, which was 
introduced in Hive 0.13). But then, it ends up invoking an incompatible thrift 
function that doesn't exist in Hive 0.12. To work around this error, I have to 
comment out the following line of code for now-
https://goo.gl/wcfnH1

My question is, is SparkSQL that is compiled against Hive 0.13 supposed to work 
with Hive 0.12 metastore (by setting spark.sql.hive.metastore.version and 
spark.sql.hive.metastore.jars)? It only works if I comment out the above line 
of code.

Thanks,
Cheolsoo



Re: Spark Streaming - Design considerations/Knobs

2015-05-24 Thread Tathagata Das
Blocks are replicated immediately, before the driver launches any jobs
using them.

On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Honestly, given the length of my email, I didn't expect a reply. :-)
 Thanks for reading and replying. However, I have a follow-up question:

 I don't think if I understand the block replication completely. Are the
 blocks replicated immediately after they are received by the receiver? Or
 are they kept on the receiver node only and are moved only on shuffle? Has
 the replication something to do with locality.wait?

 Thanks,
 Hemant

 On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com
 wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are 
 booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates
blocks of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval 
 where N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the 
 Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a
job. At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one
 Spark job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count()
 } }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch