Spark DataFrame sum of multiple columns

2016-04-21 Thread Naveen Kumar Pokala
Hi,

Do we have any way to perform Row level operations in spark dataframes.


For example,

I have a dataframe with columns from A,B,C,...Z.. I want to add one more column 
New Column with sum of all column values.

A

B

C

D

.

.

.

Z

New Column

1

2

4

3







26

351



Can somebody help me on this?


Thanks,
Naveen


Something wrong with sortBy

2016-04-21 Thread tuan3w
I'm working on implementing LSH on Spark. I start with an implementation
provided by SoundCloud:
https://github.com/soundcloud/cosine-lsh-join-spark/blob/master/src/main/scala/com/soundcloud/lsh/Lsh.scala
when I check WebUI, I see that after call sortBy, the number of partitions
of RDD descreases from 30 to 2.
I'm also verify this by checking rdd.partitions.size
As I can see from the code of RDD class (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala),
the default number of output partitions should equal the number of
partitions in the parent RDD, which in this case should be 30. Even when I
set it number explicitly, this problem still occurs.
However, when I try  simple code as follow but it works as I wish.
val d = Seq(1,2,5,6,3,4,2)
val data = sc.parallelize(d, 5)
val sortedData = data.sortBy(x => x)
println(sortedData.partitions.size) // return "5"

I'm using spark 1.6.1.
Thank you for your help.
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Something-wrong-with-sortBy-tp26819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Ask :]Best Practices - Application logging in Spark 1.5.2 + Scala 2.10

2016-04-21 Thread Divya Gehlot
Hi,
I am using Spark with Hadoop 2.7 cluster
I need to print all my print statement and or any errors to file for
instance some info if passed some level or some error if something misisng
in my Spark Scala Script.

Can some body help me or redirect me tutorial,blog, books .
Whats the best way to achieve it.

Thanks in advance.

Divya


java.io.NotSerializableException: org.apache.spark.sql.types.LongType

2016-04-21 Thread Andy Davidson
I started using 
http://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-gr
owth in python. It was really easy to get the frequent items set.
Unfortunately associations is not implemented in python.

Here is my python code It works great

rawJsonRDD = jsonToPythonDictionaries(sc, inputURL,
coalesceInputToNumPartions)

idsRDD = (rawJsonRDD
 # fetch the list of ids, the items are of type int
 .map(lambda ids : r[Œids'])
 # make sure ids are unique
 .map(lambda ids : list(set(ids)))
)



My Java Code generates java.io.NotSerializableException:
org.apache.spark.sql.types.LongType . It has something to do with the UDF I
wrote to make sure the ids are unique

Any idea what my bug is? I guess instead of data frames I could try to
implement this using RDD¹s I expect I¹ll run into a similar problem

Thanks in advance

Andy

 df.printSchema();


root

 |-- ids: array (nullable = true)

 ||-- element: long (containsNull = true)

 |-- updated: long (nullable = true)

 |-- userId: long (nullable = true)



6/04/21 16:26:50 Info FrequentItems: expr: UniqIdsUDF(Ids) as uniqueIds



UniqIdsUDF.register(sqlContext);



 DataFrame df2 = df.selectExpr(inputColName, expr);



/**
 * this is based on some test code I wrote that
 * that takes in a list of strings and returns a list of strings
 */
public class UniqIdsUDF implements UDF1,
Serializable {

private static final long serialVersionUID = 1L;

public static final String udfName = "UniqIdsUDF";



public static void register(SQLContext ssc) {

// TODO probably need to be careful about registering multiple times

UniqIdsUDF udf = new UniqIdsUDF();

DataType elementType = new LongType();

DataType returnType = DataTypes.createArrayType(elementType);

ssc.udf().register(udfName, udf, returnType);

}



@Override

public Long[] call(WrappedArray idsArg) throws Exception {

List ids = JavaConversions.asJavaList(idsArg);

HashSet hs = new HashSet(ids);

Iterator it = hs.iterator();

int size = hs.size();

Long[] ret = new Long[size];

for (int i = 0; i < size; i++) {

ret[i] = it.next();

}

}



Exception in thread "main" org.apache.spark.SparkException: Task not
serializable

at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:304)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$
clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
0)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11
1)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)

at 
org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.s
cala:56)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.
scala:132)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.
scala:130)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
0)

at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)

at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)

at 
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165
)

at 
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scal
a:174)

at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec
ute$1$1.apply(DataFrame.scala:1499)

at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec
ute$1$1.apply(DataFrame.scala:1499)

at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution
.scala:56)

at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)

at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(Dat
aFrame.scala:1498)

at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataF
rame.scala:1505)

at 
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)

at 
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)

at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)

at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)

at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)

at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)

at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)

at 

Re: Why Spark having OutOfMemory Exception?

2016-04-21 Thread Zhan Zhang
The data may be not large, but the driver need to do a lot of bookkeeping. In 
your case,  it is possible the driver control plane takes too much memory.

I think you can find a java developer to look at the coredump. Otherwise, it is 
hard to tell exactly which part are using all the memory.

Thanks.

Zhan Zhang


On Apr 20, 2016, at 1:38 AM, 李明伟 
> wrote:

Hi

the input data size is less than 10M. The task result size should be less I 
think. Because I am doing aggregation on the data





At 2016-04-20 16:18:31, "Jeff Zhang" 
> wrote:
Do you mean the input data size as 10M or the task result size ?

>>> But my way is to setup a forever loop to handle continued income data. Not 
>>> sure if it is the right way to use spark
Not sure what this mean, do you use spark-streaming, for doing batch job in the 
forever loop ?



On Wed, Apr 20, 2016 at 3:55 PM, 李明伟 
> wrote:
Hi Jeff

The total size of my data is less than 10M. I already set the driver memory to 
4GB.







在 2016-04-20 13:42:25,"Jeff Zhang" > 
写道:
Seems it is OOM in driver side when fetching task result.

You can try to increase spark.driver.memory and spark.driver.maxResultSize

On Tue, Apr 19, 2016 at 4:06 PM, 李明伟 
> wrote:
Hi Zhan Zhang


Please see the exception trace below. It is saying some GC overhead limit error
I am not a java or scala developer so it is hard for me to understand these 
infor.
Also reading coredump is too difficult to me..

I am not sure if the way I am using spark is correct. I understand that spark 
can do batch or stream calculation. But my way is to setup a forever loop to 
handle continued income data.
Not sure if it is the right way to use spark


16/04/19 15:54:55 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
at 
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at 
org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:79)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:109)
Exception in thread "task-result-getter-2" java.lang.OutOfMemoryError: GC 
overhead limit exceeded
at scala.collection.immutable.HashMap$HashTrieMap.updated0(HashMap.scala:328)
at scala.collection.immutable.HashMap.updated(HashMap.scala:54)
at 
scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 

RE: Create tab separated file from a dataframe spark 1.4 with Java

2016-04-21 Thread Mohammed Guller
It should be straightforward to do this using the spark-csv package. Assuming 
“myDF” is your DataFrame, you can use the following code to save data in  a TSV 
file.

myDF.write
.format("com.databricks.spark.csv")
.option("delimiter", "\t")
.save("data.tsv")

Mohammed

From: Mail.com [mailto:pradeep.mi...@mail.com]
Sent: Thursday, April 21, 2016 12:29 PM
To: pradeep.mi...@mail.com
Cc: user@spark.apache.org
Subject: Create tab separated file from a dataframe spark 1.4 with Java

Hi

I have a dataframe and need to write to a tab separated file using spark 1.4 
and Java.

Can some one please suggest.

Thanks,
Pradeep


Spark SQL insert overwrite table not showing all the partition.

2016-04-21 Thread Bijay Kumar Pathak
Hi,

I have a job which writes to the Hive table with dynamic partition. Inside
the job,  I am writing into the table two-time but I am only seeing the
partition with last write although I can see in the Spark UI it is
processing data fro both the partition.

Below is the query I am using to write to the table.

hive_c.sql("""INSERT OVERWRITE TABLE base_table PARTITION (date='{1}', date_2)
  SELECT * from temp_table
""".format(date_val)
 )



Thanks,
Bijay


Re: Spark 2.0 forthcoming features

2016-04-21 Thread Jules Damji
Thanks Michael, we're doing a Spark 2.0 webinar. Register and if you can't make 
it; you can always watch the recording.

Cheers 
Jules 

Sent from my iPhone
Pardon the dumb thumb typos :)

> On Apr 20, 2016, at 10:15 AM, Michael Malak  
> wrote:
> 
> http://go.databricks.com/apache-spark-2.0-presented-by-databricks-co-founder-reynold-xin
> 
> 
> 
> 
> From: Sourav Mazumder 
> To: user  
> Sent: Wednesday, April 20, 2016 11:07 AM
> Subject: Spark 2.0 forthcoming features
> 
> Hi All,
> 
> Is there somewhere we can get idea of the upcoming features in Spark 2.0.
> 
> I got a list for Spark ML from here 
> https://issues.apache.org/jira/browse/SPARK-12626.
> 
> Is there other links where I can similar enhancements planned for Sparl SQL, 
> Spark Core, Spark Streaming. GraphX etc. ?
> 
> Thanks in advance.
> 
> Regards,
> Sourav
> 
> 


Re: Spark 1.6.1 DataFrame write to JDBC

2016-04-21 Thread Mich Talebzadeh
I would be surprised if Oracle cannot handle million row calculations,
unless you are also using other data in Spark.


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 22:22, Jonathan Gray  wrote:

> I think I know understand what the problem is and it is, in some ways, to
> do with partitions and, in other ways, to do with memory.
>
> I now think that the database write was not the source of the problem (the
> problem being end-to-end performance).
>
> The application reads rows from a database, does some simple derivations
> then a group by and sum and joins the results back onto the original
> dataframe and finally writes back to a database (at the same granularity as
> the read).
>
> I would run for 100,000 rows locally and end-to-end performance was
> reasonable if I went up to 1,000,000 rows performance was far worse than
> the increase in volume would suggest.  Profiling seemed to indicate that
> the read and write were still happening efficiently in both cases but
> nothing much was happening in between.  Looking at the profiler during this
> period indicated that most of the time was being spent in a
> Platform.copyMemory(...).  As a result I increased the heap and driver
> memory and the performance improved dramatically.
>
> It turns out that the groupBy key results in a bad distribution where 90%
> of the data ends up in one partition and at the point of write out I'm
> guessing that is when Spark decides it has to copy all of that data onto
> one partition/worker.  Increasing the memory available appears to give it
> the ability to do that.  The app has several cache() calls in it to get
> around a codegen issue which probably puts more pressure on memory usage
> and compounds the problem.
>
> Still, I have learnt that Spark is excellent in it's ability to be lazy
> and optimize across the whole end-to-end process from read-to-write.
>
> On 21 April 2016 at 13:46, Michael Segel 
> wrote:
>
>> How many partitions in your data set.
>>
>> Per the Spark DataFrameWritetr Java Doc:
>> “
>>
>> Saves the content of the DataFrame
>> 
>> to a external database table via JDBC. In the case the table already exists
>> in the external database, behavior of this function depends on the save
>> mode, specified by the mode function (default to throwing an exception).
>>
>> Don't create too many partitions in parallel on a large cluster;
>> otherwise Spark might crash your external database systems.
>>
>> “
>>
>> This implies one connection per partition writing in parallel. So you
>> could be swamping your database.
>> Which database are you using?
>>
>> Also, how many hops?
>> Network latency could also impact performance too…
>>
>> On Apr 19, 2016, at 3:14 PM, Jonathan Gray  wrote:
>>
>> Hi,
>>
>> I'm trying to write ~60 million rows from a DataFrame to a database using
>> JDBC using Spark 1.6.1, something similar to df.write().jdbc(...)
>>
>> The write seems to not be performing well.  Profiling the application
>> with a master of local[*] it appears there is not much socket write
>> activity and also not much CPU.
>>
>> I would expect there to be an almost continuous block of socket write
>> activity showing up somewhere in the profile.
>>
>> I can see that the top hot method involves
>> apache.spark.unsafe.platform.CopyMemory all from calls within
>> JdbcUtils.savePartition(...).  However, the CPU doesn't seem particularly
>> stressed so I'm guessing this isn't the cause of the problem.
>>
>> Is there any best practices or has anyone come across a case like this
>> before where a write to a database seems to perform poorly?
>>
>> Thanks,
>> Jon
>>
>>
>>
>


Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
hm still struggling with those two above

scala> import org.apache.spark.internal.Logging
:57: error: object internal is not a member of package
org.apache.spark
 import org.apache.spark.internal.Logging
 ^
scala> import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
:60: error: object KafkaCluster in package kafka cannot be
accessed in package org.apache.spark.streaming.kafka
 import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset


My jars are

spark-shell --master spark://50.140.197.217:7077 --jars
/home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar,/home/hduser/jars/spark-core_2.10-1.5.1-tests.jar,/home/hduser/jars/spark-core_2.10-1.5.1.jar,/home/hduser/jars/spark-streaming-kafka_2.10-1.5.1.jar



*jar tvf spark-core_2.10-1.5.1.jar|grep Logging*  1722 Wed Sep 23 23:34:40
BST 2015 org/apache/spark/scheduler/EventLoggingListener$$anonfun$8.class
  1584 Wed Sep 23 23:34:40 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$start$2.class
  1327 Wed Sep 23 23:34:40 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$6.class
  1528 Wed Sep 23 23:34:40 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$8$$anonfun$apply$1.class
  1664 Wed Sep 23 23:34:40 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$1.class
  1437 Wed Sep 23 23:34:40 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$4.class
  1114 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$5.class
  1687 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$logEvent$3.class
  1349 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$2.class
  1339 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$logEvent$2.class
  1347 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$3.class
  1651 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$start$1.class
   981 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$7.class
  1798 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$logEvent$1.class
 23625 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener.class
  1650 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$stop$2.class
 10011 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$.class
  1441 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$openEventLog$1.class
  1273 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$stop$1.class
  1137 Wed Sep 23 23:34:42 BST 2015
org/apache/spark/scheduler/EventLoggingListener$$anonfun$openEventLog$2.class
  2308 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/ErrorMonitor$$anonfun$receiveWithLogging$2.class
  2621 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.class
  2727 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$3.class
  2614 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$2.class
  1670 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$6.class
  5519 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.class
  2156 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$7.class
  2611 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.class
  1284 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/ErrorMonitor$$anonfun$receiveWithLogging$2$$anonfun$applyOrElse$8.class
  2203 Wed Sep 23 23:34:44 BST 2015
org/apache/spark/rpc/akka/AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$5.class
  2213 Wed Sep 23 23:34:44 BST 2015 org/apache/spark/Logging.class
  1703 Wed Sep 23 23:34:46 BST 2015 org/apache/spark/Logging$.class
  6523 Wed Sep 23 23:34:46 BST 2015 org/apache/spark/Logging$class.class


And

jar tvf spark-streaming-kafka_2.10-1.5.1.jar|grep LeaderOffset
  2226 Wed Sep 23 23:47:50 BST 2015

Re: Spark 1.6.1 DataFrame write to JDBC

2016-04-21 Thread Jonathan Gray
I think I know understand what the problem is and it is, in some ways, to
do with partitions and, in other ways, to do with memory.

I now think that the database write was not the source of the problem (the
problem being end-to-end performance).

The application reads rows from a database, does some simple derivations
then a group by and sum and joins the results back onto the original
dataframe and finally writes back to a database (at the same granularity as
the read).

I would run for 100,000 rows locally and end-to-end performance was
reasonable if I went up to 1,000,000 rows performance was far worse than
the increase in volume would suggest.  Profiling seemed to indicate that
the read and write were still happening efficiently in both cases but
nothing much was happening in between.  Looking at the profiler during this
period indicated that most of the time was being spent in a
Platform.copyMemory(...).  As a result I increased the heap and driver
memory and the performance improved dramatically.

It turns out that the groupBy key results in a bad distribution where 90%
of the data ends up in one partition and at the point of write out I'm
guessing that is when Spark decides it has to copy all of that data onto
one partition/worker.  Increasing the memory available appears to give it
the ability to do that.  The app has several cache() calls in it to get
around a codegen issue which probably puts more pressure on memory usage
and compounds the problem.

Still, I have learnt that Spark is excellent in it's ability to be lazy and
optimize across the whole end-to-end process from read-to-write.

On 21 April 2016 at 13:46, Michael Segel  wrote:

> How many partitions in your data set.
>
> Per the Spark DataFrameWritetr Java Doc:
> “
>
> Saves the content of the DataFrame
> 
> to a external database table via JDBC. In the case the table already exists
> in the external database, behavior of this function depends on the save
> mode, specified by the mode function (default to throwing an exception).
>
> Don't create too many partitions in parallel on a large cluster; otherwise
> Spark might crash your external database systems.
>
> “
>
> This implies one connection per partition writing in parallel. So you
> could be swamping your database.
> Which database are you using?
>
> Also, how many hops?
> Network latency could also impact performance too…
>
> On Apr 19, 2016, at 3:14 PM, Jonathan Gray  wrote:
>
> Hi,
>
> I'm trying to write ~60 million rows from a DataFrame to a database using
> JDBC using Spark 1.6.1, something similar to df.write().jdbc(...)
>
> The write seems to not be performing well.  Profiling the application with
> a master of local[*] it appears there is not much socket write activity and
> also not much CPU.
>
> I would expect there to be an almost continuous block of socket write
> activity showing up somewhere in the profile.
>
> I can see that the top hot method involves
> apache.spark.unsafe.platform.CopyMemory all from calls within
> JdbcUtils.savePartition(...).  However, the CPU doesn't seem particularly
> stressed so I'm guessing this isn't the cause of the problem.
>
> Is there any best practices or has anyone come across a case like this
> before where a write to a database seems to perform poorly?
>
> Thanks,
> Jon
>
>
>


Re: Issue with Spark shell and scalatest

2016-04-21 Thread Ted Yu
For you, it should be spark-core_2.10-1.5.1.jar

Please replace version of Spark in my example with the version you use.

On Thu, Apr 21, 2016 at 1:23 PM, Mich Talebzadeh 
wrote:

> Hi Ted
>
> I cannot see spark-core_2.11-2.0.0-SNAPSHOT.jar  under
>
> https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.11/
>
> Sorry where are these artefacts please?
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 20:24, Ted Yu  wrote:
>
>> Plug in 1.5.1 for your jars:
>>
>> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT.jar | grep Logging
>> ...
>>   1781 Thu Apr 21 08:19:34 PDT 2016
>> org/apache/spark/internal/Logging$.class
>>
>> jar tvf
>> external/kafka/target/spark-streaming-kafka_2.11-2.0.0-SNAPSHOT.jar | grep
>> LeaderOffset
>> ...
>>   3310 Thu Apr 21 08:38:40 PDT 2016
>> org/apache/spark/streaming/kafka/KafkaCluster$LeaderOffset.class
>>
>> On Thu, Apr 21, 2016 at 11:52 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> These two are giving me griefs:
>>>
>>> scala> import org.apache.spark.internal.Logging
>>> :26: error: object internal is not a member of package
>>> org.apache.spark
>>>  import org.apache.spark.internal.Logging
>>>
>>> scala> import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
>>> :29: error: object KafkaCluster in package kafka cannot be
>>> accessed in package org.apache.spark.streaming.kafka
>>>  import
>>> org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 18:29, Mich Talebzadeh 
>>> wrote:
>>>

 Thanks

 jar tvf spark-core_2.10-1.5.1-tests.jar | grep SparkFunSuite
   1787 Wed Sep 23 23:34:26 BST 2015
 org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
   1780 Wed Sep 23 23:34:26 BST 2015
 org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
   3982 Wed Sep 23 23:34:26 BST 2015 org/apache/spark/SparkFunSuite.class


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 21 April 2016 at 18:21, Ted Yu  wrote:

> Please replace version number for the release you are using :
>
> spark-core_2.10-1.5.1-tests.jar
>
> On Thu, Apr 21, 2016 at 10:18 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I don't seem to be able to locate
>> spark-core_2.11-2.0.0-SNAPSHOT-tests.jar file :(
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 17:45, Ted Yu  wrote:
>>
>>> It is in core-XX-tests jar:
>>>
>>> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar |
>>> grep SparkFunSuite
>>>   1830 Thu Apr 21 08:19:14 PDT 2016
>>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>>>   1823 Thu Apr 21 08:19:14 PDT 2016
>>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>>>   6232 Thu Apr 21 08:19:14 PDT 2016
>>> org/apache/spark/SparkFunSuite.class
>>>
>>> On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 like war of attrition :)

 now I get with sbt

 object SparkFunSuite is not a member of package org.apache.spark


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 21 April 2016 at 17:22, Ted Yu  wrote:

> Have you tried the following ?
>
> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>
> On Thu, Apr 21, 2016 at 

Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
Hi Ted

I cannot see spark-core_2.11-2.0.0-SNAPSHOT.jar  under

https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.11/

Sorry where are these artefacts please?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 20:24, Ted Yu  wrote:

> Plug in 1.5.1 for your jars:
>
> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT.jar | grep Logging
> ...
>   1781 Thu Apr 21 08:19:34 PDT 2016
> org/apache/spark/internal/Logging$.class
>
> jar tvf
> external/kafka/target/spark-streaming-kafka_2.11-2.0.0-SNAPSHOT.jar | grep
> LeaderOffset
> ...
>   3310 Thu Apr 21 08:38:40 PDT 2016
> org/apache/spark/streaming/kafka/KafkaCluster$LeaderOffset.class
>
> On Thu, Apr 21, 2016 at 11:52 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> These two are giving me griefs:
>>
>> scala> import org.apache.spark.internal.Logging
>> :26: error: object internal is not a member of package
>> org.apache.spark
>>  import org.apache.spark.internal.Logging
>>
>> scala> import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
>> :29: error: object KafkaCluster in package kafka cannot be
>> accessed in package org.apache.spark.streaming.kafka
>>  import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 18:29, Mich Talebzadeh 
>> wrote:
>>
>>>
>>> Thanks
>>>
>>> jar tvf spark-core_2.10-1.5.1-tests.jar | grep SparkFunSuite
>>>   1787 Wed Sep 23 23:34:26 BST 2015
>>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>>>   1780 Wed Sep 23 23:34:26 BST 2015
>>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>>>   3982 Wed Sep 23 23:34:26 BST 2015 org/apache/spark/SparkFunSuite.class
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 18:21, Ted Yu  wrote:
>>>
 Please replace version number for the release you are using :

 spark-core_2.10-1.5.1-tests.jar

 On Thu, Apr 21, 2016 at 10:18 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> I don't seem to be able to locate
> spark-core_2.11-2.0.0-SNAPSHOT-tests.jar file :(
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 17:45, Ted Yu  wrote:
>
>> It is in core-XX-tests jar:
>>
>> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar |
>> grep SparkFunSuite
>>   1830 Thu Apr 21 08:19:14 PDT 2016
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>>   1823 Thu Apr 21 08:19:14 PDT 2016
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>>   6232 Thu Apr 21 08:19:14 PDT 2016
>> org/apache/spark/SparkFunSuite.class
>>
>> On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> like war of attrition :)
>>>
>>> now I get with sbt
>>>
>>> object SparkFunSuite is not a member of package org.apache.spark
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 17:22, Ted Yu  wrote:
>>>
 Have you tried the following ?

 libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"

 On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Unfortunately this sbt dependency is not working
>
> libraryDependencies += "org.apache.spark" %% "spark-core" %
> "1.5.1" % "provided"
> libraryDependencies += "org.apache.spark" %% "spark-sql" %
> "1.5.1"  % "provided"
> libraryDependencies += 

Tracing Spark DataFrame Execition

2016-04-21 Thread Andrés Ivaldi
Hello, It's possible to trace DataFrame, I'd like to do a progress
DataFrame Execution?, I looked at SparkListeners, but nested dataframes
produces several Jobs, and I dont know how to relate these Jobs also I'm
reusing SparkContext.

Regards.

-- 
Ing. Ivaldi Andres


bisecting kmeans model tree

2016-04-21 Thread roni
Hi ,
 I want to get the bisecting kmeans tree structure to show a dendogram  on
the heatmap I am generating based on the hierarchical clustering of data.
 How do I get that using mlib .
Thanks
-Roni


Create tab separated file from a dataframe spark 1.4 with Java

2016-04-21 Thread Mail.com
> Hi 

I have a dataframe and need to write to a tab separated file using spark 1.4 
and Java.
 
Can some one please suggest.

Thanks,
Pradeep 

Re: Issue with Spark shell and scalatest

2016-04-21 Thread Ted Yu
Plug in 1.5.1 for your jars:

$ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT.jar | grep Logging
...
  1781 Thu Apr 21 08:19:34 PDT 2016 org/apache/spark/internal/Logging$.class

jar tvf external/kafka/target/spark-streaming-kafka_2.11-2.0.0-SNAPSHOT.jar
| grep LeaderOffset
...
  3310 Thu Apr 21 08:38:40 PDT 2016
org/apache/spark/streaming/kafka/KafkaCluster$LeaderOffset.class

On Thu, Apr 21, 2016 at 11:52 AM, Mich Talebzadeh  wrote:

> These two are giving me griefs:
>
> scala> import org.apache.spark.internal.Logging
> :26: error: object internal is not a member of package
> org.apache.spark
>  import org.apache.spark.internal.Logging
>
> scala> import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
> :29: error: object KafkaCluster in package kafka cannot be
> accessed in package org.apache.spark.streaming.kafka
>  import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 18:29, Mich Talebzadeh 
> wrote:
>
>>
>> Thanks
>>
>> jar tvf spark-core_2.10-1.5.1-tests.jar | grep SparkFunSuite
>>   1787 Wed Sep 23 23:34:26 BST 2015
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>>   1780 Wed Sep 23 23:34:26 BST 2015
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>>   3982 Wed Sep 23 23:34:26 BST 2015 org/apache/spark/SparkFunSuite.class
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 18:21, Ted Yu  wrote:
>>
>>> Please replace version number for the release you are using :
>>>
>>> spark-core_2.10-1.5.1-tests.jar
>>>
>>> On Thu, Apr 21, 2016 at 10:18 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 I don't seem to be able to locate
 spark-core_2.11-2.0.0-SNAPSHOT-tests.jar file :(

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 21 April 2016 at 17:45, Ted Yu  wrote:

> It is in core-XX-tests jar:
>
> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar |
> grep SparkFunSuite
>   1830 Thu Apr 21 08:19:14 PDT 2016
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>   1823 Thu Apr 21 08:19:14 PDT 2016
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>   6232 Thu Apr 21 08:19:14 PDT 2016
> org/apache/spark/SparkFunSuite.class
>
> On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> like war of attrition :)
>>
>> now I get with sbt
>>
>> object SparkFunSuite is not a member of package org.apache.spark
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 17:22, Ted Yu  wrote:
>>
>>> Have you tried the following ?
>>>
>>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>>>
>>> On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Unfortunately this sbt dependency is not working

 libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"
 % "provided"
 libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"
 % "provided"
 libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1"
 % "provided"
 libraryDependencies += "junit" % "junit" % "4.12"
 libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
 libraryDependencies += "org.apache.spark" %% "spark-streaming" %
 "1.6.1" % "provided"
 libraryDependencies += "org.apache.spark" %%
 "spark-streaming-kafka" % "1.6.1"
 libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
 *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" %
 "test"*

 Getting error

 [info] Compiling 1 Scala source to

Can't access sqlite db from Spark

2016-04-21 Thread sturm
Hi, I have the folowing code:

val conf = new SparkConf().setAppName("Spark Test")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val data = sqlContext.read.format("jdbc").options(
  Map(
"url" -> "jdbc:sqlite:/nv/pricing/ix_tri_pi.sqlite3",
"dbtable" -> "SELECT security_id FROM ix_tri_pi")).load()

data.foreach {
  row => println(row.getInt(1))
}

And I try to submit it with:

spark-submit \
  --class "com.novus.analytics.spark.SparkTest" \ 
  --master "local[4]" \

/Users/smabie/workspace/analytics/analytics-spark/target/scala-2.10/analytics-spark.jar
\
  --conf spark.executer.extraClassPath=sqlite-jdbc-3.8.7.jar \
  --conf  spark.driver.extraClassPath=sqlite-jdbc-3.8.7.jar \
  --driver-class-path sqlite-jdbc-3.8.7.jar \
  --jars sqlite-jdbc-3.8.7.jar

But I get the following exception:

Exception in thread "main" java.sql.SQLException: No suitable driver

I am using Spark version 1.6.1, if that helps. Thanks! Any help would be
*greatly* appreciated!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-access-sqlite-db-from-Spark-tp26814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Bryan Jeffrey
Here is what we're doing:


import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import net.liftweb.json.Extraction._
import net.liftweb.json._
import org.apache.spark.streaming.dstream.DStream

class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
Int) {
  def write[T](data: DStream[T]): Unit = {
KafkaWriter.write(data, topic, brokers, numPartitions)
  }
}

object KafkaWriter {
  def write[T](data: DStream[T], topic: String, brokers: Array[String],
numPartitions: Int): Unit = {
val dataToWrite =
  if (numPartitions > 0) {
data.repartition(numPartitions)
  } else {
data
  }

dataToWrite
  .map(x => new KeyedMessage[String, String](topic,
KafkaWriter.toJson(x)))
  .foreachRDD(rdd => {
  rdd.foreachPartition(part => {
val producer: Producer[String, String] =
KafkaWriter.createProducer(brokers)
part.foreach(item => producer.send(item))
producer.close()
  })
})
  }

  def apply(brokers: Option[Array[String]], topic: String, numPartitions:
Int): KafkaWriter = {
val brokersToUse =
  brokers match {
case Some(x) => x
case None => throw new IllegalArgumentException("Must specify
brokers!")
  }

new KafkaWriter(brokersToUse, topic, numPartitions)
  }

  def toJson[T](data: T): String = {
implicit val formats = DefaultFormats ++
net.liftweb.json.ext.JodaTimeSerializers.all
compactRender(decompose(data))
  }

  def createProducer(brokers: Array[String]): Producer[String, String] = {
val properties = new Properties()
properties.put("metadata.broker.list", brokers.mkString(","))
properties.put("serializer.class", "kafka.serializer.StringEncoder")

val kafkaConfig = new ProducerConfig(properties)
new Producer[String, String](kafkaConfig)
  }
}


Then just call:

val kafkaWriter: KafkaWriter =
KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
config.getString(Parameters.topicName), numPartitions =
kafkaWritePartitions)
detectionWriter.write(dataToWriteToKafka)


Hope that helps!

Bryan Jeffrey

On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego 
wrote:

> Thanks Ted.
>
>  KafkaWordCount (producer) does not operate on a DStream[T]
>
> ```scala
>
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
> if (args.length < 4) {
>   System.err.println("Usage: KafkaWordCountProducer
>   " +
> " ")
>   System.exit(1)
> }
>
> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
> // Zookeeper connection properties
> val props = new HashMap[String, Object]()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
>
> val producer = new KafkaProducer[String, String](props)
>
> // Send some messages
> while(true) {
>   (1 to messagesPerSec.toInt).foreach { messageNum =>
> val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>   .mkString(" ")
>
> val message = new ProducerRecord[String, String](topic, null, str)
> producer.send(message)
>   }
>
>   Thread.sleep(1000)
> }
>   }
>
> }
>
> ```
>
>
> Also, doing:
>
>
> ```
> object KafkaSink {
>  def send(brokers: String, sc: SparkContext, topic: String, key:
> String, value: String) =
> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
> key, value))
> }
>
> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>
> ```
>
>
> Doesn't work either, the result is:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
>
> Thanks!
>
>
>
>
> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
> >
> > In KafkaWordCount , the String is sent back and producer.send() is
> called.
> >
> > I guess if you don't find via solution in your current design, you can
> consider the above.
> >
> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
> wrote:
> >>
> >> Hello,
> >>
> >> I understand that you cannot serialize Kafka Producer.
> >>
> >> So I've tried:
> >>
> >> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)
>
> >>
> >>  - Make the class Serializable - not possible
> >>
> >>  - Declare the instance only within the lambda function passed in map.
> >>
> >> via:
> >>
> >> // as suggested by the docs
> >>
> >>
> >> ```scala
> >>
> >>kafkaOut.foreachRDD(rdd => {
> >>  rdd.foreachPartition(partition => {
> >>   val producer = new KafkaProducer(..)
> >>   partition.foreach { record =>
> >>   producer.send(new 

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Todd Nist
Have you looked at these:

http://allegro.tech/2015/08/spark-kafka-integration.html
http://mkuthan.github.io/blog/2016/01/29/spark-kafka-integration2/

Full example here:

https://github.com/mkuthan/example-spark-kafka

HTH.

-Todd

On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego 
wrote:

> Thanks Ted.
>
>  KafkaWordCount (producer) does not operate on a DStream[T]
>
> ```scala
>
>
> object KafkaWordCountProducer {
>
>   def main(args: Array[String]) {
> if (args.length < 4) {
>   System.err.println("Usage: KafkaWordCountProducer
>   " +
> " ")
>   System.exit(1)
> }
>
> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>
> // Zookeeper connection properties
> val props = new HashMap[String, Object]()
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>   "org.apache.kafka.common.serialization.StringSerializer")
>
> val producer = new KafkaProducer[String, String](props)
>
> // Send some messages
> while(true) {
>   (1 to messagesPerSec.toInt).foreach { messageNum =>
> val str = (1 to wordsPerMessage.toInt).map(x =>
> scala.util.Random.nextInt(10).toString)
>   .mkString(" ")
>
> val message = new ProducerRecord[String, String](topic, null, str)
> producer.send(message)
>   }
>
>   Thread.sleep(1000)
> }
>   }
>
> }
>
> ```
>
>
> Also, doing:
>
>
> ```
> object KafkaSink {
>  def send(brokers: String, sc: SparkContext, topic: String, key:
> String, value: String) =
> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
> key, value))
> }
>
> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>
> ```
>
>
> Doesn't work either, the result is:
>
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
>
>
> Thanks!
>
>
>
>
> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
> >
> > In KafkaWordCount , the String is sent back and producer.send() is
> called.
> >
> > I guess if you don't find via solution in your current design, you can
> consider the above.
> >
> > On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
> wrote:
> >>
> >> Hello,
> >>
> >> I understand that you cannot serialize Kafka Producer.
> >>
> >> So I've tried:
> >>
> >> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)
>
> >>
> >>  - Make the class Serializable - not possible
> >>
> >>  - Declare the instance only within the lambda function passed in map.
> >>
> >> via:
> >>
> >> // as suggested by the docs
> >>
> >>
> >> ```scala
> >>
> >>kafkaOut.foreachRDD(rdd => {
> >>  rdd.foreachPartition(partition => {
> >>   val producer = new KafkaProducer(..)
> >>   partition.foreach { record =>
> >>   producer.send(new ProducerRecord(outputTopic, record._1,
> record._2)
> >>}
> >>   producer.close()
> >>})
> >>  }) // foreachRDD
> >>
> >>
> >> ```
> >>
> >> - Make the NotSerializable object as a static and create it once per
> machine.
> >>
> >> via:
> >>
> >>
> >> ```scala
> >>
> >>
> >> object KafkaSink {
> >>   @volatile private var instance: Broadcast[KafkaProducer[String,
> String]] = null
> >>   def getInstance(brokers: String, sc: SparkContext):
> Broadcast[KafkaProducer[String, String]] = {
> >> if (instance == null) {
> >>   synchronized {
> >> println("Creating new kafka producer")
> >> val props = new java.util.Properties()
> >> ...
> >> instance = sc.broadcast(new KafkaProducer[String,
> String](props))
> >> sys.addShutdownHook {
> >>   instance.value.close()
> >> }
> >>   }
> >> }
> >> instance
> >>   }
> >> }
> >>
> >>
> >> ```
> >>
> >>
> >>
> >>  - Call rdd.forEachPartition and create the NotSerializable object in
> there like this:
> >>
> >> Same as above.
> >>
> >>
> >> - Mark the instance @transient
> >>
> >> Same thing, just make it a class variable via:
> >>
> >>
> >> ```
> >> @transient var producer: KakfaProducer[String,String] = null
> >> def getInstance() = {
> >>if( producer == null ) {
> >>producer = new KafkaProducer()
> >>}
> >>producer
> >> }
> >>
> >> ```
> >>
> >>
> >> However, I get serialization problems with all of these options.
> >>
> >>
> >> Thanks for your help.
> >>
> >> - Alex
> >>
> >
>
>
>
> --
>
>
>
>
>
> Alexander Gallego
> Co-Founder & CTO
>


Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
These two are giving me griefs:

scala> import org.apache.spark.internal.Logging
:26: error: object internal is not a member of package
org.apache.spark
 import org.apache.spark.internal.Logging

scala> import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
:29: error: object KafkaCluster in package kafka cannot be
accessed in package org.apache.spark.streaming.kafka
 import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 18:29, Mich Talebzadeh 
wrote:

>
> Thanks
>
> jar tvf spark-core_2.10-1.5.1-tests.jar | grep SparkFunSuite
>   1787 Wed Sep 23 23:34:26 BST 2015
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>   1780 Wed Sep 23 23:34:26 BST 2015
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>   3982 Wed Sep 23 23:34:26 BST 2015 org/apache/spark/SparkFunSuite.class
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 18:21, Ted Yu  wrote:
>
>> Please replace version number for the release you are using :
>>
>> spark-core_2.10-1.5.1-tests.jar
>>
>> On Thu, Apr 21, 2016 at 10:18 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I don't seem to be able to locate
>>> spark-core_2.11-2.0.0-SNAPSHOT-tests.jar file :(
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 17:45, Ted Yu  wrote:
>>>
 It is in core-XX-tests jar:

 $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar | grep
 SparkFunSuite
   1830 Thu Apr 21 08:19:14 PDT 2016
 org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
   1823 Thu Apr 21 08:19:14 PDT 2016
 org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
   6232 Thu Apr 21 08:19:14 PDT 2016 org/apache/spark/SparkFunSuite.class

 On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> like war of attrition :)
>
> now I get with sbt
>
> object SparkFunSuite is not a member of package org.apache.spark
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 17:22, Ted Yu  wrote:
>
>> Have you tried the following ?
>>
>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>>
>> On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Unfortunately this sbt dependency is not working
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"
>>> % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"
>>> % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1"
>>> % "provided"
>>> libraryDependencies += "junit" % "junit" % "4.12"
>>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>>> "1.6.1" % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka"
>>> % "1.6.1"
>>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>>> *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" %
>>> "test"*
>>>
>>> Getting error
>>>
>>> [info] Compiling 1 Scala source to
>>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>>> [error]
>>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
>>> object scalatest is not a member of package org
>>> [error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>> [error]^
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> 

Does Spark Steaming support event time window now?

2016-04-21 Thread Yifei Li
I read from the following article:

https://databricks.com/blog/2015/07/30/diving-into-spark-streamings-execution-model.html

which says that Spark Streaming has a future direction for "Event time and
out-of-order". I am wondering if it is supported now.

I my scenario, I have three streams, stream1, stream2, stream3. Each stream
has its own timestamp.

I want to join stream1, stream2 and stream3 every 5 minutes. Sometimes
stream2 will be delayed(hours or even days)...I am wondering if Spark
Streaming is a good fit for my scenario?

Thanks,

Yifei


Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Thanks Ted.

 KafkaWordCount (producer) does not operate on a DStream[T]

```scala


object KafkaWordCountProducer {

  def main(args: Array[String]) {
if (args.length < 4) {
  System.err.println("Usage: KafkaWordCountProducer
  " +
" ")
  System.exit(1)
}

val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args

// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

// Send some messages
while(true) {
  (1 to messagesPerSec.toInt).foreach { messageNum =>
val str = (1 to wordsPerMessage.toInt).map(x =>
scala.util.Random.nextInt(10).toString)
  .mkString(" ")

val message = new ProducerRecord[String, String](topic, null, str)
producer.send(message)
  }

  Thread.sleep(1000)
}
  }

}

```


Also, doing:


```
object KafkaSink {
 def send(brokers: String, sc: SparkContext, topic: String, key:
String, value: String) =
getInstance(brokers, sc).value.send(new ProducerRecord(topic,
key, value))
}

KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)

```


Doesn't work either, the result is:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable


Thanks!



On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
>
> In KafkaWordCount , the String is sent back and producer.send() is called.
>
> I guess if you don't find via solution in your current design, you can
consider the above.
>
> On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
wrote:
>>
>> Hello,
>>
>> I understand that you cannot serialize Kafka Producer.
>>
>> So I've tried:
>>
>> (as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html)

>>
>>  - Make the class Serializable - not possible
>>
>>  - Declare the instance only within the lambda function passed in map.
>>
>> via:
>>
>> // as suggested by the docs
>>
>>
>> ```scala
>>
>>kafkaOut.foreachRDD(rdd => {
>>  rdd.foreachPartition(partition => {
>>   val producer = new KafkaProducer(..)
>>   partition.foreach { record =>
>>   producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
>>}
>>   producer.close()
>>})
>>  }) // foreachRDD
>>
>>
>> ```
>>
>> - Make the NotSerializable object as a static and create it once per
machine.
>>
>> via:
>>
>>
>> ```scala
>>
>>
>> object KafkaSink {
>>   @volatile private var instance: Broadcast[KafkaProducer[String,
String]] = null
>>   def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
>> if (instance == null) {
>>   synchronized {
>> println("Creating new kafka producer")
>> val props = new java.util.Properties()
>> ...
>> instance = sc.broadcast(new KafkaProducer[String, String](props))
>> sys.addShutdownHook {
>>   instance.value.close()
>> }
>>   }
>> }
>> instance
>>   }
>> }
>>
>>
>> ```
>>
>>
>>
>>  - Call rdd.forEachPartition and create the NotSerializable object in
there like this:
>>
>> Same as above.
>>
>>
>> - Mark the instance @transient
>>
>> Same thing, just make it a class variable via:
>>
>>
>> ```
>> @transient var producer: KakfaProducer[String,String] = null
>> def getInstance() = {
>>if( producer == null ) {
>>producer = new KafkaProducer()
>>}
>>producer
>> }
>>
>> ```
>>
>>
>> However, I get serialization problems with all of these options.
>>
>>
>> Thanks for your help.
>>
>> - Alex
>>
>



--





Alexander Gallego
Co-Founder & CTO


Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
Thanks

jar tvf spark-core_2.10-1.5.1-tests.jar | grep SparkFunSuite
  1787 Wed Sep 23 23:34:26 BST 2015
org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
  1780 Wed Sep 23 23:34:26 BST 2015
org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
  3982 Wed Sep 23 23:34:26 BST 2015 org/apache/spark/SparkFunSuite.class


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 18:21, Ted Yu  wrote:

> Please replace version number for the release you are using :
>
> spark-core_2.10-1.5.1-tests.jar
>
> On Thu, Apr 21, 2016 at 10:18 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> I don't seem to be able to locate
>> spark-core_2.11-2.0.0-SNAPSHOT-tests.jar file :(
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 17:45, Ted Yu  wrote:
>>
>>> It is in core-XX-tests jar:
>>>
>>> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar | grep
>>> SparkFunSuite
>>>   1830 Thu Apr 21 08:19:14 PDT 2016
>>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>>>   1823 Thu Apr 21 08:19:14 PDT 2016
>>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>>>   6232 Thu Apr 21 08:19:14 PDT 2016 org/apache/spark/SparkFunSuite.class
>>>
>>> On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 like war of attrition :)

 now I get with sbt

 object SparkFunSuite is not a member of package org.apache.spark


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 21 April 2016 at 17:22, Ted Yu  wrote:

> Have you tried the following ?
>
> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>
> On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Unfortunately this sbt dependency is not working
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>> "provided"
>> libraryDependencies += "junit" % "junit" % "4.12"
>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
>> "1.6.1" % "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka"
>> % "1.6.1"
>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>> *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" %
>> "test"*
>>
>> Getting error
>>
>> [info] Compiling 1 Scala source to
>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
>> object scalatest is not a member of package org
>> [error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> [error]^
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 16:49, Mich Talebzadeh > > wrote:
>>
>>> Thanks Ted. It was a typo in my alias and it is sorted now
>>>
>>> slong='rlwrap spark-shell --master spark://50.140.197.217:7077
>>> --jars
>>> /home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'
>>>
>>>
>>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> 

Re: Issue with Spark shell and scalatest

2016-04-21 Thread Ted Yu
Please replace version number for the release you are using :

spark-core_2.10-1.5.1-tests.jar

On Thu, Apr 21, 2016 at 10:18 AM, Mich Talebzadeh  wrote:

> I don't seem to be able to locate spark-core_2.11-2.0.0-SNAPSHOT-tests.jar
> file :(
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 17:45, Ted Yu  wrote:
>
>> It is in core-XX-tests jar:
>>
>> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar | grep
>> SparkFunSuite
>>   1830 Thu Apr 21 08:19:14 PDT 2016
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>>   1823 Thu Apr 21 08:19:14 PDT 2016
>> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>>   6232 Thu Apr 21 08:19:14 PDT 2016 org/apache/spark/SparkFunSuite.class
>>
>> On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> like war of attrition :)
>>>
>>> now I get with sbt
>>>
>>> object SparkFunSuite is not a member of package org.apache.spark
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 17:22, Ted Yu  wrote:
>>>
 Have you tried the following ?

 libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"

 On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> Unfortunately this sbt dependency is not working
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
> "provided"
> libraryDependencies += "junit" % "junit" % "4.12"
> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
> libraryDependencies += "org.apache.spark" %% "spark-streaming" %
> "1.6.1" % "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1"
> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
> *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" %
> "test"*
>
> Getting error
>
> [info] Compiling 1 Scala source to
> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
> object scalatest is not a member of package org
> [error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> [error]^
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 16:49, Mich Talebzadeh 
> wrote:
>
>> Thanks Ted. It was a typo in my alias and it is sorted now
>>
>> slong='rlwrap spark-shell --master spark://50.140.197.217:7077
>> --jars
>> /home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'
>>
>>
>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 16:44, Ted Yu  wrote:
>>
>>> I tried on refreshed copy of master branch:
>>>
>>> $ bin/spark-shell --jars
>>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>>> ...
>>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>>
>>> BTW I noticed an extra leading comma after '--jars' in your email.
>>> Not sure if that matters.
>>>
>>> On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:
>>>
 Mich:

 $ jar tvf
 

Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
I don't seem to be able to locate spark-core_2.11-2.0.0-SNAPSHOT-tests.jar
file :(

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 17:45, Ted Yu  wrote:

> It is in core-XX-tests jar:
>
> $ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar | grep
> SparkFunSuite
>   1830 Thu Apr 21 08:19:14 PDT 2016
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
>   1823 Thu Apr 21 08:19:14 PDT 2016
> org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
>   6232 Thu Apr 21 08:19:14 PDT 2016 org/apache/spark/SparkFunSuite.class
>
> On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> like war of attrition :)
>>
>> now I get with sbt
>>
>> object SparkFunSuite is not a member of package org.apache.spark
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 17:22, Ted Yu  wrote:
>>
>>> Have you tried the following ?
>>>
>>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>>>
>>> On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Unfortunately this sbt dependency is not working

 libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
 "provided"
 libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
 "provided"
 libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
 "provided"
 libraryDependencies += "junit" % "junit" % "4.12"
 libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
 libraryDependencies += "org.apache.spark" %% "spark-streaming" %
 "1.6.1" % "provided"
 libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
 "1.6.1"
 libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
 *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" %
 "test"*

 Getting error

 [info] Compiling 1 Scala source to
 /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
 [error]
 /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
 object scalatest is not a member of package org
 [error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 [error]^



 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 21 April 2016 at 16:49, Mich Talebzadeh 
 wrote:

> Thanks Ted. It was a typo in my alias and it is sorted now
>
> slong='rlwrap spark-shell --master spark://50.140.197.217:7077 --jars
> /home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'
>
>
> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 16:44, Ted Yu  wrote:
>
>> I tried on refreshed copy of master branch:
>>
>> $ bin/spark-shell --jars
>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>> ...
>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>
>> BTW I noticed an extra leading comma after '--jars' in your email.
>> Not sure if that matters.
>>
>> On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:
>>
>>> Mich:
>>>
>>> $ jar tvf
>>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>>> | grep BeforeAndAfter
>>>   4257 Sat Dec 26 14:35:48 PST 2015
>>> org/scalatest/BeforeAndAfter$class.class
>>>   2602 Sat Dec 26 14:35:48 PST 2015
>>> org/scalatest/BeforeAndAfter.class
>>>   1998 Sat Dec 26 14:35:48 PST 2015
>>> 

Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Ted Yu
In KafkaWordCount , the String is sent back and producer.send() is called.

I guess if you don't find via solution in your current design, you can
consider the above.

On Thu, Apr 21, 2016 at 10:04 AM, Alexander Gallego 
wrote:

> Hello,
>
> I understand that you cannot serialize Kafka Producer.
>
> So I've tried:
>
> (as suggested here
> https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
> )
>
>  - Make the class Serializable - not possible
>
>  - Declare the instance only within the lambda function passed in map.
>
> via:
>
> // as suggested by the docs
>
>
> ```scala
>
>kafkaOut.foreachRDD(rdd => {
>  rdd.foreachPartition(partition => {
>   val producer = new KafkaProducer(..)
>   partition.foreach { record =>
>   producer.send(new ProducerRecord(outputTopic, record._1,
> record._2)
>}
>   producer.close()
>})
>  }) // foreachRDD
>
>
> ```
>
> - Make the NotSerializable object as a static and create it once per
> machine.
>
> via:
>
>
> ```scala
>
>
> object KafkaSink {
>   @volatile private var instance: Broadcast[KafkaProducer[String, String]]
> = null
>   def getInstance(brokers: String, sc: SparkContext):
> Broadcast[KafkaProducer[String, String]] = {
> if (instance == null) {
>   synchronized {
> println("Creating new kafka producer")
> val props = new java.util.Properties()
> ...
> instance = sc.broadcast(new KafkaProducer[String, String](props))
> sys.addShutdownHook {
>   instance.value.close()
> }
>   }
> }
> instance
>   }
> }
>
>
> ```
>
>
>
>  - Call rdd.forEachPartition and create the NotSerializable object in
> there like this:
>
> Same as above.
>
>
> - Mark the instance @transient
>
> Same thing, just make it a class variable via:
>
>
> ```
> @transient var producer: KakfaProducer[String,String] = null
> def getInstance() = {
>if( producer == null ) {
>producer = new KafkaProducer()
>}
>producer
> }
>
> ```
>
>
> However, I get serialization problems with all of these options.
>
>
> Thanks for your help.
>
> - Alex
>
>


Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-21 Thread Alexander Gallego
Hello,

I understand that you cannot serialize Kafka Producer.

So I've tried:

(as suggested here
https://forums.databricks.com/questions/369/how-do-i-handle-a-task-not-serializable-exception.html
)

 - Make the class Serializable - not possible

 - Declare the instance only within the lambda function passed in map.

via:

// as suggested by the docs


```scala

   kafkaOut.foreachRDD(rdd => {
 rdd.foreachPartition(partition => {
  val producer = new KafkaProducer(..)
  partition.foreach { record =>
  producer.send(new ProducerRecord(outputTopic, record._1,
record._2)
   }
  producer.close()
   })
 }) // foreachRDD


```

- Make the NotSerializable object as a static and create it once per
machine.

via:


```scala


object KafkaSink {
  @volatile private var instance: Broadcast[KafkaProducer[String, String]]
= null
  def getInstance(brokers: String, sc: SparkContext):
Broadcast[KafkaProducer[String, String]] = {
if (instance == null) {
  synchronized {
println("Creating new kafka producer")
val props = new java.util.Properties()
...
instance = sc.broadcast(new KafkaProducer[String, String](props))
sys.addShutdownHook {
  instance.value.close()
}
  }
}
instance
  }
}


```



 - Call rdd.forEachPartition and create the NotSerializable object in there
like this:

Same as above.


- Mark the instance @transient

Same thing, just make it a class variable via:


```
@transient var producer: KakfaProducer[String,String] = null
def getInstance() = {
   if( producer == null ) {
   producer = new KafkaProducer()
   }
   producer
}

```


However, I get serialization problems with all of these options.


Thanks for your help.

- Alex


Re: Spark support for Complex Event Processing (CEP)

2016-04-21 Thread Mich Talebzadeh
Hi Mario, I sorted that one out with Ted's help thanks

scalatest_2.11-2.2.6.jar


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 17:57, Mario Ds Briggs  wrote:

> googling 'java error 'is not a member of package' and then even its
> related searches seemed to suggest it is not a missing jar problem, though
> i couldnt put a finger on what exactly it is in your case
>
> some specifically in spark-shell as well -
> http://spark-packages.org/package/databricks/spark-csv
>
>
> thanks
> Mario
>
> [image: Inactive hide details for Mich Talebzadeh ---21/04/2016 08:34:08
> pm---Hi, Following example in]Mich Talebzadeh ---21/04/2016 08:34:08
> pm---Hi, Following example in
>
> From: Mich Talebzadeh 
> To: Mario Ds Briggs/India/IBM@IBMIN
> Cc: Alonso Isidoro Roman , Luciano Resende <
> luckbr1...@gmail.com>, "user @spark" 
> Date: 21/04/2016 08:34 pm
>
> Subject: Re: Spark support for Complex Event Processing (CEP)
> --
>
>
>
> Hi,
>
> Following example in
>
>
> *https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532*
> 
>
> Does anyone know which jar file this belongs to?
>
> I use *scalatest_2.11-2.2.6.jar *in my spark-shell
>
>  spark-shell --master spark://*50.140.197.217:7077*
>  --jars
> ,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
> /*home/hduser/jars/scalatest_2.11-2.2.6.jar'*
>
> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> :28: error: object scalatest is not a member of package org
>  import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
> LinkedIn
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
> 
>
> *http://talebzadehmich.wordpress.com*
> 
>
>
>
> On 20 April 2016 at 10:28, Mario Ds Briggs <*mario.bri...@in.ibm.com*
> > wrote:
>
>I did see your earlier post about Stratio decision. Will readup on it
>
>
>thanks
>Mario
>
>[image: Inactive hide details for Alonso Isidoro Roman ---20/04/2016
>02:24:39 pm---Stratio decision could do the job https://github.com]Alonso
>Isidoro Roman ---20/04/2016 02:24:39 pm---Stratio decision could do the job
>*https://github.com/Stratio/Decision*
>
>
>From: Alonso Isidoro Roman <*alons...@gmail.com* >
>To: Mich Talebzadeh <*mich.talebza...@gmail.com*
>>
>Cc: Mario Ds Briggs/India/IBM@IBMIN, Luciano Resende <
>*luckbr1...@gmail.com* >, "user @spark" <
>*user@spark.apache.org* >
>Date: 20/04/2016 02:24 pm
>Subject: Re: Spark support for Complex Event Processing (CEP)
>--
>
>
>
>Stratio decision could do the job
>
> *https://github.com/Stratio/Decision*
>
>
>
>
>Alonso Isidoro Roman.
>
>Mis citas preferidas (de hoy) :
>"Si depurar es el proceso de quitar los errores de software, entonces
>programar debe ser el proceso de introducirlos..."
> -  Edsger Dijkstra
>
>My favorite quotes (today):
>"If debugging is the process of removing software bugs, then
>programming must be the process of putting ..."
>  - Edsger Dijkstra
>
>"If you pay peanuts you get monkeys"
>
>
>2016-04-20 7:55 GMT+02:00 Mich Talebzadeh <*mich.talebza...@gmail.com*
>>:
>   Thanks a lot Mario. Will have a look.
>
>  Regards,
>
>
>  Dr Mich Talebzadeh
>
>  LinkedIn
>  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>  
> 
>
> *http://talebzadehmich.wordpress.com*
>  
>
>
>
>  On 20 April 2016 at 06:53, Mario Ds Briggs <
>  *mario.bri...@in.ibm.com* > wrote:
>  Hi Mich,
>
>  Info is here -
>  *https://issues.apache.org/jira/browse/SPARK-14745*
>  
>
>  overview is in the pdf -
>  
> 

Re: Spark support for Complex Event Processing (CEP)

2016-04-21 Thread Mario Ds Briggs

googling 'java error 'is not a member of package' and then even its related
searches seemed to suggest it is not a missing jar problem, though i
couldnt put a finger on what exactly it is in your case

some specifically in spark-shell as well -
http://spark-packages.org/package/databricks/spark-csv


thanks
Mario



From:   Mich Talebzadeh 
To: Mario Ds Briggs/India/IBM@IBMIN
Cc: Alonso Isidoro Roman , Luciano Resende
, "user @spark" 
Date:   21/04/2016 08:34 pm
Subject:Re: Spark support for Complex Event Processing (CEP)



Hi,

Following example in

https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

Does anyone know which jar file this belongs to?

I use scalatest_2.11-2.2.6.jar in my spark-shell

 spark-shell --master spark://50.140.197.217:7077
--jars 
,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
 /
home/hduser/jars/scalatest_2.11-2.2.6.jar'

scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
:28: error: object scalatest is not a member of package org
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

Thanks


Dr Mich Talebzadeh

LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

http://talebzadehmich.wordpress.com




On 20 April 2016 at 10:28, Mario Ds Briggs  wrote:
  I did see your earlier post about Stratio decision. Will readup on it


  thanks
  Mario

  Inactive hide details for Alonso Isidoro Roman ---20/04/2016 02:24:39
  pm---Stratio decision could do the job https://github.comAlonso Isidoro
  Roman ---20/04/2016 02:24:39 pm---Stratio decision could do the job
  https://github.com/Stratio/Decision

  From: Alonso Isidoro Roman 
  To: Mich Talebzadeh 
  Cc: Mario Ds Briggs/India/IBM@IBMIN, Luciano Resende <
  luckbr1...@gmail.com>, "user @spark" 
  Date: 20/04/2016 02:24 pm
  Subject: Re: Spark support for Complex Event Processing (CEP)



  Stratio decision could do the job

  https://github.com/Stratio/Decision



  Alonso Isidoro Roman.

  Mis citas preferidas (de hoy) :
  "Si depurar es el proceso de quitar los errores de software, entonces
  programar debe ser el proceso de introducirlos..."
   -  Edsger Dijkstra

  My favorite quotes (today):
  "If debugging is the process of removing software bugs, then programming
  must be the process of putting ..."
    - Edsger Dijkstra

  "If you pay peanuts you get monkeys"


  2016-04-20 7:55 GMT+02:00 Mich Talebzadeh :
Thanks a lot Mario. Will have a look.

Regards,


Dr Mich Talebzadeh

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


http://talebzadehmich.wordpress.com




On 20 April 2016 at 06:53, Mario Ds Briggs  wrote:
Hi Mich,

Info is here - https://issues.apache.org/jira/browse/SPARK-14745

overview is in the pdf -

https://issues.apache.org/jira/secure/attachment/12799670/SparkStreamingCEP.pdf


Usage examples not in the best place for now (will make it better)
-

https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532


Your feedback is appreciated.


thanks
Mario

Inactive hide details for Mich Talebzadeh ---19/04/2016 12:45:52
am---great stuff Mario. Much appreciated. MichMich Talebzadeh
---19/04/2016 12:45:52 am---great stuff Mario. Much appreciated.
Mich

From: Mich Talebzadeh 
To: Mario Ds Briggs/India/IBM@IBMIN
Cc: "user @spark" , Luciano Resende <
luckbr1...@gmail.com>
Date: 19/04/2016 12:45 am
Subject: Re: Spark support for Complex Event Processing (CEP)




great stuff Mario. Much appreciated.

Mich

Dr Mich Talebzadeh

LinkedIn

https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


http://talebzadehmich.wordpress.com




On 18 April 2016 at 20:08, Mario Ds Briggs  wrote:
Hey Mich, Luciano

Will provide links with docs by tomorrow

thanks
Mario

- Message from Mich Talebzadeh <
mich.talebza...@gmail.com> on Sun, 17 Apr 2016 19:17:38
+0100 -
  
   To: Luciano Resende  

Re: Issue with Spark shell and scalatest

2016-04-21 Thread Ted Yu
It is in core-XX-tests jar:

$ jar tvf ./core/target/spark-core_2.11-2.0.0-SNAPSHOT-tests.jar | grep
SparkFunSuite
  1830 Thu Apr 21 08:19:14 PDT 2016
org/apache/spark/SparkFunSuite$$anonfun$withFixture$1.class
  1823 Thu Apr 21 08:19:14 PDT 2016
org/apache/spark/SparkFunSuite$$anonfun$withFixture$2.class
  6232 Thu Apr 21 08:19:14 PDT 2016 org/apache/spark/SparkFunSuite.class

On Thu, Apr 21, 2016 at 9:39 AM, Mich Talebzadeh 
wrote:

> like war of attrition :)
>
> now I get with sbt
>
> object SparkFunSuite is not a member of package org.apache.spark
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 17:22, Ted Yu  wrote:
>
>> Have you tried the following ?
>>
>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>>
>> On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Unfortunately this sbt dependency is not working
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>>> "provided"
>>> libraryDependencies += "junit" % "junit" % "4.12"
>>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>>> % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>>> "1.6.1"
>>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>>> *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" %
>>> "test"*
>>>
>>> Getting error
>>>
>>> [info] Compiling 1 Scala source to
>>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>>> [error]
>>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
>>> object scalatest is not a member of package org
>>> [error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>> [error]^
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 16:49, Mich Talebzadeh 
>>> wrote:
>>>
 Thanks Ted. It was a typo in my alias and it is sorted now

 slong='rlwrap spark-shell --master spark://50.140.197.217:7077 --jars
 /home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'


 scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}



 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 21 April 2016 at 16:44, Ted Yu  wrote:

> I tried on refreshed copy of master branch:
>
> $ bin/spark-shell --jars
> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
> ...
> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>
> BTW I noticed an extra leading comma after '--jars' in your email.
> Not sure if that matters.
>
> On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:
>
>> Mich:
>>
>> $ jar tvf
>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>> | grep BeforeAndAfter
>>   4257 Sat Dec 26 14:35:48 PST 2015
>> org/scalatest/BeforeAndAfter$class.class
>>   2602 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter.class
>>   1998 Sat Dec 26 14:35:48 PST 2015
>> org/scalatest/BeforeAndAfterAll$$anonfun$run$1.class
>>
>> FYI
>>
>> On Thu, Apr 21, 2016 at 8:35 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> to Mario, Alonso, Luciano, user
>>> Hi,
>>>
>>> Following example in
>>>
>>>
>>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>>>
>>> Does anyone know which jar file this belongs to?
>>>
>>> I use *scalatest_2.11-2.2.6.jar *in my 

Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
like war of attrition :)

now I get with sbt

object SparkFunSuite is not a member of package org.apache.spark


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 17:22, Ted Yu  wrote:

> Have you tried the following ?
>
> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>
> On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Unfortunately this sbt dependency is not working
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>> "provided"
>> libraryDependencies += "junit" % "junit" % "4.12"
>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>> % "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.6.1"
>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>> *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"*
>>
>> Getting error
>>
>> [info] Compiling 1 Scala source to
>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
>> object scalatest is not a member of package org
>> [error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> [error]^
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 16:49, Mich Talebzadeh 
>> wrote:
>>
>>> Thanks Ted. It was a typo in my alias and it is sorted now
>>>
>>> slong='rlwrap spark-shell --master spark://50.140.197.217:7077 --jars
>>> /home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'
>>>
>>>
>>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 16:44, Ted Yu  wrote:
>>>
 I tried on refreshed copy of master branch:

 $ bin/spark-shell --jars
 /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
 ...
 scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

 BTW I noticed an extra leading comma after '--jars' in your email.
 Not sure if that matters.

 On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:

> Mich:
>
> $ jar tvf
> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
> | grep BeforeAndAfter
>   4257 Sat Dec 26 14:35:48 PST 2015
> org/scalatest/BeforeAndAfter$class.class
>   2602 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter.class
>   1998 Sat Dec 26 14:35:48 PST 2015
> org/scalatest/BeforeAndAfterAll$$anonfun$run$1.class
>
> FYI
>
> On Thu, Apr 21, 2016 at 8:35 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> to Mario, Alonso, Luciano, user
>> Hi,
>>
>> Following example in
>>
>>
>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>>
>> Does anyone know which jar file this belongs to?
>>
>> I use *scalatest_2.11-2.2.6.jar *in my spark-shell
>>
>>  spark-shell --master spark://50.140.197.217:7077 --jars
>> ,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
>> /*home/hduser/jars/scalatest_2.11-2.2.6.jar'*
>>
>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> :28: error: object scalatest is not a member of package org
>>  import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> 

EMR Spark Custom Metrics

2016-04-21 Thread Mark Kelly
Hi,

So i would like some custom metrics.

The environment we use is AWS EMR 4.5.0 with spark 1.6.1 and Ganglia.

the code snippit below shows how we register custom metrics (this worked in EMR 
4.2.0 with spark 1.5.2)

package org.apache.spark.metrics.source

import com.codahale.metrics._
import org.apache.spark.{Logging, SparkEnv}

class SparkInstrumentation(val prefix: String) extends Serializable with 
Logging {

  class InstrumentationSource(val prefix: String) extends Source {

log.info (s"Starting spark instrumentation with prefix 
$prefix")
override val sourceName = prefix
override val metricRegistry = new MetricRegistry()

def registerCounter(name: String): Counter = {
  metricRegistry.counter(MetricRegistry.name(name))
}

def registerTimer(name: String): Timer = {
  metricRegistry.timer(MetricRegistry.name(name))
}
  }

  val source = new InstrumentationSource(prefix)

  def register() {
SparkEnv.get.metricsSystem.registerSource(source)
  }

}

This unfortunately no longer works.

How is it possible to create custom metrics?

Thanks



Re: Issue with Spark shell and scalatest

2016-04-21 Thread Ted Yu
Have you tried the following ?

libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"

On Thu, Apr 21, 2016 at 9:19 AM, Mich Talebzadeh 
wrote:

> Unfortunately this sbt dependency is not working
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
> "provided"
> libraryDependencies += "junit" % "junit" % "4.12"
> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1"
> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
> *libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"*
>
> Getting error
>
> [info] Compiling 1 Scala source to
> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
> object scalatest is not a member of package org
> [error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> [error]^
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 16:49, Mich Talebzadeh 
> wrote:
>
>> Thanks Ted. It was a typo in my alias and it is sorted now
>>
>> slong='rlwrap spark-shell --master spark://50.140.197.217:7077 --jars
>> /home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'
>>
>>
>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 16:44, Ted Yu  wrote:
>>
>>> I tried on refreshed copy of master branch:
>>>
>>> $ bin/spark-shell --jars
>>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>>> ...
>>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>>
>>> BTW I noticed an extra leading comma after '--jars' in your email.
>>> Not sure if that matters.
>>>
>>> On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:
>>>
 Mich:

 $ jar tvf
 /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
 | grep BeforeAndAfter
   4257 Sat Dec 26 14:35:48 PST 2015
 org/scalatest/BeforeAndAfter$class.class
   2602 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter.class
   1998 Sat Dec 26 14:35:48 PST 2015
 org/scalatest/BeforeAndAfterAll$$anonfun$run$1.class

 FYI

 On Thu, Apr 21, 2016 at 8:35 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> to Mario, Alonso, Luciano, user
> Hi,
>
> Following example in
>
>
> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>
> Does anyone know which jar file this belongs to?
>
> I use *scalatest_2.11-2.2.6.jar *in my spark-shell
>
>  spark-shell --master spark://50.140.197.217:7077 --jars
> ,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
> /*home/hduser/jars/scalatest_2.11-2.2.6.jar'*
>
> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> :28: error: object scalatest is not a member of package org
>  import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


>>>
>>
>


Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
Unfortunately this sbt dependency is not working

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
"provided"
libraryDependencies += "junit" % "junit" % "4.12"
libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.6.1"
libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
*libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"*

Getting error

[info] Compiling 1 Scala source to
/data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:32:
object scalatest is not a member of package org
[error] import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
[error]^



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 16:49, Mich Talebzadeh 
wrote:

> Thanks Ted. It was a typo in my alias and it is sorted now
>
> slong='rlwrap spark-shell --master spark://50.140.197.217:7077 --jars
> /home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'
>
>
> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 16:44, Ted Yu  wrote:
>
>> I tried on refreshed copy of master branch:
>>
>> $ bin/spark-shell --jars
>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>> ...
>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>
>> BTW I noticed an extra leading comma after '--jars' in your email.
>> Not sure if that matters.
>>
>> On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:
>>
>>> Mich:
>>>
>>> $ jar tvf
>>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>>> | grep BeforeAndAfter
>>>   4257 Sat Dec 26 14:35:48 PST 2015
>>> org/scalatest/BeforeAndAfter$class.class
>>>   2602 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter.class
>>>   1998 Sat Dec 26 14:35:48 PST 2015
>>> org/scalatest/BeforeAndAfterAll$$anonfun$run$1.class
>>>
>>> FYI
>>>
>>> On Thu, Apr 21, 2016 at 8:35 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 to Mario, Alonso, Luciano, user
 Hi,

 Following example in


 https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

 Does anyone know which jar file this belongs to?

 I use *scalatest_2.11-2.2.6.jar *in my spark-shell

  spark-shell --master spark://50.140.197.217:7077 --jars
 ,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
 /*home/hduser/jars/scalatest_2.11-2.2.6.jar'*

 scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 :28: error: object scalatest is not a member of package org
  import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

 Thanks

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



>>>
>>>
>>
>


Re: Save DataFrame to HBase

2016-04-21 Thread Benjamin Kim
Hi Ted,

Can this module be used with an older version of HBase, such as 1.0 or 1.1? 
Where can I get the module from?

Thanks,
Ben

> On Apr 21, 2016, at 6:56 AM, Ted Yu  wrote:
> 
> The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do 
> this.
> 
> On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim  > wrote:
> Has anyone found an easy way to save a DataFrame into HBase?
> 
> Thanks,
> Ben
> 
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
Thanks Ted. It was a typo in my alias and it is sorted now

slong='rlwrap spark-shell --master spark://50.140.197.217:7077 --jars
/home/hduser/jars/ojdbc6.jar,/home/hduser/jars/jconn4.jar,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,/home/hduser/jars/scalatest_2.11-2.2.6.jar'


scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 16:44, Ted Yu  wrote:

> I tried on refreshed copy of master branch:
>
> $ bin/spark-shell --jars
> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
> ...
> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>
> BTW I noticed an extra leading comma after '--jars' in your email.
> Not sure if that matters.
>
> On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:
>
>> Mich:
>>
>> $ jar tvf
>> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
>> | grep BeforeAndAfter
>>   4257 Sat Dec 26 14:35:48 PST 2015
>> org/scalatest/BeforeAndAfter$class.class
>>   2602 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter.class
>>   1998 Sat Dec 26 14:35:48 PST 2015
>> org/scalatest/BeforeAndAfterAll$$anonfun$run$1.class
>>
>> FYI
>>
>> On Thu, Apr 21, 2016 at 8:35 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> to Mario, Alonso, Luciano, user
>>> Hi,
>>>
>>> Following example in
>>>
>>>
>>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>>>
>>> Does anyone know which jar file this belongs to?
>>>
>>> I use *scalatest_2.11-2.2.6.jar *in my spark-shell
>>>
>>>  spark-shell --master spark://50.140.197.217:7077 --jars
>>> ,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
>>> /*home/hduser/jars/scalatest_2.11-2.2.6.jar'*
>>>
>>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>> :28: error: object scalatest is not a member of package org
>>>  import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>>
>>> Thanks
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>
>>
>


Re: Issue with Spark shell and scalatest

2016-04-21 Thread Ted Yu
I tried on refreshed copy of master branch:

$ bin/spark-shell --jars
/home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
...
scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

BTW I noticed an extra leading comma after '--jars' in your email.
Not sure if that matters.

On Thu, Apr 21, 2016 at 8:39 AM, Ted Yu  wrote:

> Mich:
>
> $ jar tvf
> /home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
> | grep BeforeAndAfter
>   4257 Sat Dec 26 14:35:48 PST 2015
> org/scalatest/BeforeAndAfter$class.class
>   2602 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter.class
>   1998 Sat Dec 26 14:35:48 PST 2015
> org/scalatest/BeforeAndAfterAll$$anonfun$run$1.class
>
> FYI
>
> On Thu, Apr 21, 2016 at 8:35 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> to Mario, Alonso, Luciano, user
>> Hi,
>>
>> Following example in
>>
>>
>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>>
>> Does anyone know which jar file this belongs to?
>>
>> I use *scalatest_2.11-2.2.6.jar *in my spark-shell
>>
>>  spark-shell --master spark://50.140.197.217:7077 --jars
>> ,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
>> /*home/hduser/jars/scalatest_2.11-2.2.6.jar'*
>>
>> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>> :28: error: object scalatest is not a member of package org
>>  import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


Re: Issue with Spark shell and scalatest

2016-04-21 Thread Ted Yu
Mich:

$ jar tvf
/home/hbase/.m2/repository/org/scalatest/scalatest_2.11/2.2.6/scalatest_2.11-2.2.6.jar
| grep BeforeAndAfter
  4257 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter$class.class
  2602 Sat Dec 26 14:35:48 PST 2015 org/scalatest/BeforeAndAfter.class
  1998 Sat Dec 26 14:35:48 PST 2015
org/scalatest/BeforeAndAfterAll$$anonfun$run$1.class

FYI

On Thu, Apr 21, 2016 at 8:35 AM, Mich Talebzadeh 
wrote:

> to Mario, Alonso, Luciano, user
> Hi,
>
> Following example in
>
>
> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>
> Does anyone know which jar file this belongs to?
>
> I use *scalatest_2.11-2.2.6.jar *in my spark-shell
>
>  spark-shell --master spark://50.140.197.217:7077 --jars
> ,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
> /*home/hduser/jars/scalatest_2.11-2.2.6.jar'*
>
> scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
> :28: error: object scalatest is not a member of package org
>  import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Issue with Spark shell and scalatest

2016-04-21 Thread Mich Talebzadeh
to Mario, Alonso, Luciano, user
Hi,

Following example in

https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

Does anyone know which jar file this belongs to?

I use *scalatest_2.11-2.2.6.jar *in my spark-shell

 spark-shell --master spark://50.140.197.217:7077 --jars
,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
/*home/hduser/jars/scalatest_2.11-2.2.6.jar'*

scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
:28: error: object scalatest is not a member of package org
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: spark on yarn

2016-04-21 Thread Steve Loughran
If there isn't enough space in your cluster for all the executors you asked for 
to be created, Spark will only get the ones which can be allocated. It will 
start work without waiting for the others to arrive.

Make sure you ask for enough memory: YARN is a lot more unforgiving about 
memory use than it is about CPU

> On 20 Apr 2016, at 16:21, Shushant Arora  wrote:
> 
> I am running a spark application on yarn cluster.
> 
> say I have available vcors in cluster as 100.And I start spark application 
> with --num-executors 200 --num-cores 2 (so I need total 200*2=400 vcores) but 
> in my cluster only 100 are available.
> 
> What will happen ? Will the job abort or it will be submitted successfully 
> and 100 vcores will be aallocated to 50 executors and rest executors will be 
> started as soon as vcores are available ?
> 
> Please note dynamic allocation is not enabled in cluster. I have old version 
> 1.2.
> 
> Thanks
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.6.1 already maximum pages

2016-04-21 Thread nihed mbarek
Hi

I just got an issue with my execution on spark 1.6.1
I'm trying to join between two dataframes one of 5 partition and the
second small 2 partition.
Spark Sql shuffle partitions equal to 256000

Any idea ??

java.lang.IllegalStateException: Have already allocated a maximum of 8192
pages
at
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:259)
at
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:112)
at
org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
at
org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread


-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: Spark support for Complex Event Processing (CEP)

2016-04-21 Thread Mich Talebzadeh
Hi,

Following example in

https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

Does anyone know which jar file this belongs to?

I use *scalatest_2.11-2.2.6.jar *in my spark-shell

 spark-shell --master spark://50.140.197.217:7077 --jars
,/home/hduser/jars/junit-4.12.jar,/home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar,
/*home/hduser/jars/scalatest_2.11-2.2.6.jar'*

scala> import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
:28: error: object scalatest is not a member of package org
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 April 2016 at 10:28, Mario Ds Briggs  wrote:

> I did see your earlier post about Stratio decision. Will readup on it
>
>
> thanks
> Mario
>
> [image: Inactive hide details for Alonso Isidoro Roman ---20/04/2016
> 02:24:39 pm---Stratio decision could do the job https://github.com]Alonso
> Isidoro Roman ---20/04/2016 02:24:39 pm---Stratio decision could do the job
> https://github.com/Stratio/Decision
>
> From: Alonso Isidoro Roman 
> To: Mich Talebzadeh 
> Cc: Mario Ds Briggs/India/IBM@IBMIN, Luciano Resende ,
> "user @spark" 
> Date: 20/04/2016 02:24 pm
> Subject: Re: Spark support for Complex Event Processing (CEP)
> --
>
>
>
> Stratio decision could do the job
>
> *https://github.com/Stratio/Decision*
> 
>
>
>
> Alonso Isidoro Roman.
>
> Mis citas preferidas (de hoy) :
> "Si depurar es el proceso de quitar los errores de software, entonces
> programar debe ser el proceso de introducirlos..."
>  -  Edsger Dijkstra
>
> My favorite quotes (today):
> "If debugging is the process of removing software bugs, then programming
> must be the process of putting ..."
>   - Edsger Dijkstra
>
> "If you pay peanuts you get monkeys"
>
>
> 2016-04-20 7:55 GMT+02:00 Mich Talebzadeh <*mich.talebza...@gmail.com*
> >:
>
>Thanks a lot Mario. Will have a look.
>
>Regards,
>
>
>Dr Mich Talebzadeh
>
>LinkedIn
>
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
> 
>
>*http://talebzadehmich.wordpress.com*
>
>
>
>
>On 20 April 2016 at 06:53, Mario Ds Briggs <*mario.bri...@in.ibm.com*
>> wrote:
>Hi Mich,
>
>Info is here - *https://issues.apache.org/jira/browse/SPARK-14745*
>
>
>overview is in the pdf -
>
> *https://issues.apache.org/jira/secure/attachment/12799670/SparkStreamingCEP.pdf*
>
> 
>
>Usage examples not in the best place for now (will make it better) -
>
> *https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532*
>
> 
>
>Your feedback is appreciated.
>
>
>thanks
>Mario
>
>[image: Inactive hide details for Mich Talebzadeh ---19/04/2016
>12:45:52 am---great stuff Mario. Much appreciated. Mich]Mich
>Talebzadeh ---19/04/2016 12:45:52 am---great stuff Mario. Much appreciated.
>Mich
>
>From: Mich Talebzadeh <*mich.talebza...@gmail.com*
>>
>To: Mario Ds Briggs/India/IBM@IBMIN
>Cc: "user @spark" <*user@spark.apache.org* >,
>Luciano Resende <*luckbr1...@gmail.com* >
>Date: 19/04/2016 12:45 am
>Subject: Re: Spark support for Complex Event Processing (CEP)
>--
>
>
>
>
>great stuff Mario. Much appreciated.
>
>Mich
>
>Dr Mich Talebzadeh
>
>LinkedIn
>
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
> 
>
> *http://talebzadehmich.wordpress.com*
>
>
>
>
>On 18 April 2016 at 20:08, Mario Ds Briggs <*mario.bri...@in.ibm.com*
>> wrote:
>   Hey Mich, Luciano
>
>  Will provide links with docs by tomorrow
>
>  thanks
>  Mario
>
>  - Message from Mich Talebzadeh <*mich.talebza...@gmail.com*
>  

Re: Pls assist: which conf file do i need to modify if i want spark-shell to inclucde external packages?

2016-04-21 Thread Mich Talebzadeh
try this using the shell parameter

SPARK_CLASSPATH

in $HIVE_HOME/conf

cp spark-env.sh.template spark-env.sh

Then edit that file and set

export SPARK_CLASSPATH=

Connect to spark-shell and see if it find it


HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 15:25, Marco Mistroni  wrote:

> Thank mich but I seem to remember to modify a config file so that I don't
> need to specify the --packages option every time  I start the shell
> Kr
> On 21 Apr 2016 3:20 pm, "Mich Talebzadeh" 
> wrote:
>
>> on spark-shell this will work
>>
>> $SPARK_HOME/bin/spark-shell *--packages *
>> com.databricks:spark-csv_2.11:1.3.0
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 15:13, Marco Mistroni  wrote:
>>
>>> HI all
>>>  i need to use spark-csv in my spark instance, and i want to avoid
>>> launching spark-shell
>>> by passing the package name every time
>>> I seem to remember that i need to amend a file in the /conf directory to
>>> inlcude e,g
>>> spark.packages  com.databricks:spark-csv_2.11:1.4.0 
>>>
>>> but i cannot find any docs tell ing me which config file  i have to
>>> modify
>>>
>>> anyone can assist ?
>>> kr
>>>  marco
>>>
>>
>>


Re: How to change akka.remote.startup-timeout in spark

2016-04-21 Thread Todd Nist
I believe you can adjust it by setting the following:

spark.akka.timeout 100s Communication timeout between Spark nodes.

HTH.

-Todd



On Thu, Apr 21, 2016 at 9:49 AM, yuemeng (A)  wrote:

> When I run a spark application,sometimes I get follow ERROR:
>
> 16/04/21 09:26:45 ERROR SparkContext: Error initializing SparkContext.
>
> java.util.concurrent.TimeoutException: Futures timed out after [1
> milliseconds]
>
>  at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
>  at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
>  at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>
>  at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>
>  at scala.concurrent.Await$.result(package.scala:107)
>
>  at akka.remote.Remoting.start(Remoting.scala:180)
>
>  at
> akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
>
>  at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
>
>  at
> akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
>
>  at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
>
>  at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
>
>  at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
>
>  at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
>
>  at
> org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
>
>  at
> org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
>
>  at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1995)
>
>  at
> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>
>  at
> org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1986)
>
>  at
> org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
>
>  at
> org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)
>
>
>
>
>
> AND I track the code ,I think if we update akka.remote.startup-timeout
> mabe solve this problem,but I can’t find any way to change this,
>
> Do anybody met this problem and know how to change akka config in spark?
>
> Thanks a lot
>
>
>
> *岳猛(Rick) 00277916*
>
> *大数据技术开发部*
>
>
> *
>
> [image: cid:image012.jpg@01D0D9C8.DDEDCC20]*文档包*
> 
>
> [image: cid:image009.png@01D0DA69.58E5C9A0]*培训中心*
> 
>
> [image: cid:image010.png@01D0DA69.58E5C9A0]*案例库*
> 
>
>   *中软大数据3ms团队: **http://3ms.huawei.com/hi/group/2031037*
> 
>
>
>
>
>


Re: Spark 1.6.1 DataFrame write to JDBC

2016-04-21 Thread Jonathan Gray
I tried increasing the batch size (1000 to 10,000 to 100,000) but it didn't
appear to make any appreciable difference in my test case.

In addition I had read in the Oracle JDBC documentation that batches should
be set between 10 and 100 and anything out of that range was not advisable.
However, I don't have any evidence to prove or disprove that.
On 21 Apr 2016 6:16 am, "Takeshi Yamamuro"  wrote:

> Sorry to wrongly send message in mid.
> How about trying to increate 'batchsize` in a jdbc option to improve
> performance?
>
> // maropu
>
> On Thu, Apr 21, 2016 at 2:15 PM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> How about trying to increate 'batchsize
>>
>> On Wed, Apr 20, 2016 at 7:14 AM, Jonathan Gray 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to write ~60 million rows from a DataFrame to a database
>>> using JDBC using Spark 1.6.1, something similar to df.write().jdbc(...)
>>>
>>> The write seems to not be performing well.  Profiling the application
>>> with a master of local[*] it appears there is not much socket write
>>> activity and also not much CPU.
>>>
>>> I would expect there to be an almost continuous block of socket write
>>> activity showing up somewhere in the profile.
>>>
>>> I can see that the top hot method involves
>>> apache.spark.unsafe.platform.CopyMemory all from calls within
>>> JdbcUtils.savePartition(...).  However, the CPU doesn't seem particularly
>>> stressed so I'm guessing this isn't the cause of the problem.
>>>
>>> Is there any best practices or has anyone come across a case like this
>>> before where a write to a database seems to perform poorly?
>>>
>>> Thanks,
>>> Jon
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Pls assist: which conf file do i need to modify if i want spark-shell to inclucde external packages?

2016-04-21 Thread Marco Mistroni
Thank mich but I seem to remember to modify a config file so that I don't
need to specify the --packages option every time  I start the shell
Kr
On 21 Apr 2016 3:20 pm, "Mich Talebzadeh"  wrote:

> on spark-shell this will work
>
> $SPARK_HOME/bin/spark-shell *--packages *
> com.databricks:spark-csv_2.11:1.3.0
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 15:13, Marco Mistroni  wrote:
>
>> HI all
>>  i need to use spark-csv in my spark instance, and i want to avoid
>> launching spark-shell
>> by passing the package name every time
>> I seem to remember that i need to amend a file in the /conf directory to
>> inlcude e,g
>> spark.packages  com.databricks:spark-csv_2.11:1.4.0 
>>
>> but i cannot find any docs tell ing me which config file  i have to modify
>>
>> anyone can assist ?
>> kr
>>  marco
>>
>
>


Re: Pls assist: which conf file do i need to modify if i want spark-shell to inclucde external packages?

2016-04-21 Thread Mich Talebzadeh
on spark-shell this will work

$SPARK_HOME/bin/spark-shell *--packages *com.databricks:spark-csv_2.11:1.3.0

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 15:13, Marco Mistroni  wrote:

> HI all
>  i need to use spark-csv in my spark instance, and i want to avoid
> launching spark-shell
> by passing the package name every time
> I seem to remember that i need to amend a file in the /conf directory to
> inlcude e,g
> spark.packages  com.databricks:spark-csv_2.11:1.4.0 
>
> but i cannot find any docs tell ing me which config file  i have to modify
>
> anyone can assist ?
> kr
>  marco
>


Pls assist: which conf file do i need to modify if i want spark-shell to inclucde external packages?

2016-04-21 Thread Marco Mistroni
HI all
 i need to use spark-csv in my spark instance, and i want to avoid
launching spark-shell
by passing the package name every time
I seem to remember that i need to amend a file in the /conf directory to
inlcude e,g
spark.packages  com.databricks:spark-csv_2.11:1.4.0 

but i cannot find any docs tell ing me which config file  i have to modify

anyone can assist ?
kr
 marco


Re: Spark SQL Transaction

2016-04-21 Thread Mich Talebzadeh
This statement

."..each database statement is atomic and is itself a transaction.. your
statements should be atomic and there will be no ‘redo’ or ‘commit’ or
‘rollback’."

MSSQL compiles with ACIDITY which requires that each transaction be "all or
nothing": if one part of the transaction fails, then the entire transaction
fails, and the database state is left unchanged.

Assuming that it is one transaction (through much doubt if JDBC does that
as it will take for ever), then either that transaction commits (in MSSQL
redo + undo are combined in syslogs table of the database) meaning
there will be undo + redo log generated  for that row only in syslogs. So
under normal operation every RDBMS including MSSQL, Oracle, Sybase and
others will comply with generating (redo and undo) and one cannot avoid it.
If there is a batch transaction as I suspect in this case, it is either all
or nothing. The thread owner indicated that rollback is happening so it is
consistent with all rows rolled back.

I don't think Spark, Sqoop, Hive can influence the transaction behaviour of
an RDBMS for DML. DQ (data queries) do not generate transactions.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 21 April 2016 at 13:58, Michael Segel  wrote:

> Hi,
>
> Sometimes terms get muddled over time.
>
> If you’re not using transactions, then each database statement is atomic
> and is itself a transaction.
> So unless you have some explicit ‘Begin Work’ at the start…. your
> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
> ‘rollback’.
>
> I don’t see anything in Spark’s documentation about transactions, so the
> statements should be atomic.  (I’m not a guru here so I could be missing
> something in Spark)
>
> If you’re seeing the connection drop unexpectedly and then a rollback,
> could this be a setting or configuration of the database?
>
>
> > On Apr 19, 2016, at 1:18 PM, Andrés Ivaldi  wrote:
> >
> > Hello, is possible to execute a SQL write without Transaction? we dont
> need transactions to save our data and this adds an overhead to the
> SQLServer.
> >
> > Regards.
> >
> > --
> > Ing. Ivaldi Andres
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


How to change akka.remote.startup-timeout in spark

2016-04-21 Thread yuemeng (A)
When I run a spark application,sometimes I get follow ERROR:
16/04/21 09:26:45 ERROR SparkContext: Error initializing SparkContext.
java.util.concurrent.TimeoutException: Futures timed out after [1 
milliseconds]
 at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at akka.remote.Remoting.start(Remoting.scala:180)
 at 
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
 at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
 at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
 at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1995)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1986)
 at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at 
org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)


AND I track the code ,I think if we update akka.remote.startup-timeout mabe 
solve this problem,but I can’t find any way to change this,
Do anybody met this problem and know how to change akka config in spark?
Thanks a lot

岳猛(Rick) 00277916
大数据技术开发部
*

[cid:image012.jpg@01D0D9C8.DDEDCC20]文档包

[cid:image009.png@01D0DA69.58E5C9A0]培训中心

[cid:image010.png@01D0DA69.58E5C9A0]案例库

  中软大数据3ms团队: http://3ms.huawei.com/hi/group/2031037





Re: Save DataFrame to HBase

2016-04-21 Thread Ted Yu
The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can
do this.

On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim  wrote:

> Has anyone found an easy way to save a DataFrame into HBase?
>
> Thanks,
> Ben
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RDD generated from Dataframes

2016-04-21 Thread Sean Owen
I don't think that's generally true, but is true to the extent that
you can push down the work of higher-level logical operators like
select and groupBy, on common types, that can be understood and
optimized. Your arbitrary user code is opaque and can't be optimized.
So DataFrame.groupBy.max is likely to be more efficient, if that's all
you're doing, than executing a groupBy on opaque user objects.

If you really need to apply a user function to each row, I'm not sure
DataFrames help much since you're not using them qua DataFrames, and
indeed you end up treating them as RDDs.

You should instead see if you can express more of your operations in
standard DataFrame operations (consider registering smaller UDFs if
needed), since that is what you do to get the speedups.

On Thu, Apr 21, 2016 at 2:49 PM, Apurva Nandan  wrote:
> Hello everyone,
>
> Generally speaking, I guess it's well known that dataframes are much faster
> than RDD when it comes to performance.
> My question is how do you go around when it comes to transforming a
> dataframe using map.
> I mean then the dataframe gets converted into RDD, hence now do you again
> convert this RDD to a new dataframe for better performance?
> Further, if you have a process which involves series of transformations i.e.
> from one RDD to another, do you keep on converting each RDD to a dataframe
> first, all the time?
>
> It's also possible that I might be missing something here, please share your
> experiences.
>
>
> Thanks and Regards,
> Apurva

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD generated from Dataframes

2016-04-21 Thread Ted Yu
In upcoming 2.0 release, the signature for map() has become:

  def map[U : Encoder](func: T => U): Dataset[U] = withTypedPlan {

Note: DataFrame and DataSet are unified in 2.0

FYI

On Thu, Apr 21, 2016 at 6:49 AM, Apurva Nandan  wrote:

> Hello everyone,
>
> Generally speaking, I guess it's well known that dataframes are much
> faster than RDD when it comes to performance.
> My question is how do you go around when it comes to transforming a
> dataframe using map.
> I mean then the dataframe gets converted into RDD, hence now do you again
> convert this RDD to a new dataframe for better performance?
> Further, if you have a process which involves series of transformations
> i.e. from one RDD to another, do you keep on converting each RDD to a
> dataframe first, all the time?
>
> It's also possible that I might be missing something here, please share
> your experiences.
>
>
> Thanks and Regards,
> Apurva
>


Save DataFrame to HBase

2016-04-21 Thread Benjamin Kim
Has anyone found an easy way to save a DataFrame into HBase?

Thanks,
Ben


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to change akka.remote.startup-timeout value in spark

2016-04-21 Thread yuemeng (A)
When I run a spark application ,sometimes I will get follow error:
16/04/21 09:26:45 ERROR SparkContext: Error initializing SparkContext.
java.util.concurrent.TimeoutException: Futures timed out after [1 
milliseconds]
 at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at akka.remote.Remoting.start(Remoting.scala:180)
 at 
akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
 at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
 at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
 at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
 at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
 at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
 at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1995)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
 at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1986)
 at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
 at 
org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)

And,I find maybe we can increase akka.remote.startup-timeout vaule to solve 
this problem after I track the code,but I can't find any way to change this 
value
Can we have some methods to change akka config in spark,or some good way to 
solve this problem
Any advise will be thankful
Thanks a lot




RDD generated from Dataframes

2016-04-21 Thread Apurva Nandan
Hello everyone,

Generally speaking, I guess it's well known that dataframes are much faster
than RDD when it comes to performance.
My question is how do you go around when it comes to transforming a
dataframe using map.
I mean then the dataframe gets converted into RDD, hence now do you again
convert this RDD to a new dataframe for better performance?
Further, if you have a process which involves series of transformations
i.e. from one RDD to another, do you keep on converting each RDD to a
dataframe first, all the time?

It's also possible that I might be missing something here, please share
your experiences.


Thanks and Regards,
Apurva


Re: StructField Translation Error with Spark SQL

2016-04-21 Thread Ted Yu
You meant for fields which are nullable.

Can you pastebin the complete stack trace ?

Try 1.6.1 when you have a chance.

Thanks

On Wed, Apr 20, 2016 at 10:20 PM, Charles Nnamdi Akalugwu <
cprenzb...@gmail.com> wrote:

> I get the same error for fields which are not null unfortunately.
>
> Can't translate null value for field
> StructField(density,DecimalType(4,2),true)
> On Apr 21, 2016 1:37 AM, "Ted Yu"  wrote:
>
>> The weight field is not nullable.
>>
>> Looks like your source table had null value for this field.
>>
>> On Wed, Apr 20, 2016 at 4:11 PM, Charles Nnamdi Akalugwu <
>> cprenzb...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am using spark 1.4.1 and trying to copy all rows from a table in one
>>> MySQL Database to a Amazon RDS table using spark SQL.
>>>
>>> Some columns in the source table are defined as DECIMAL type and are
>>> nullable. Others are not.  When I run my spark job,
>>>
>>> val writeData = sqlContext.read.format("jdbc").option("url",
> sourceUrl).option("driver", "com.mysql.jdbc.Driver").option("dbtable",
> table).option("user", sourceUsername).option("password",
> sourcePassword).load()





 writeData.write.format("com.databricks.spark.redshift").option("url",
> String.format(targetUrl, targetUsername, 
> targetPassword)).option("dbtable",
> table).option("tempdir", redshiftTempDir+table).mode("append").save()


>>> it fails with the following exception
>>>
>>> Can't translate null value for field
 StructField(weight,DecimalType(5,2),false)
>>>
>>>
>>> Any insights about this exception would be very helpful.
>>>
>>
>>


Re: Spark SQL Transaction

2016-04-21 Thread Michael Segel
Hi, 

Sometimes terms get muddled over time.

If you’re not using transactions, then each database statement is atomic and is 
itself a transaction. 
So unless you have some explicit ‘Begin Work’ at the start…. your statements 
should be atomic and there will be no ‘redo’ or ‘commit’ or ‘rollback’. 

I don’t see anything in Spark’s documentation about transactions, so the 
statements should be atomic.  (I’m not a guru here so I could be missing 
something in Spark) 

If you’re seeing the connection drop unexpectedly and then a rollback, could 
this be a setting or configuration of the database? 


> On Apr 19, 2016, at 1:18 PM, Andrés Ivaldi  wrote:
> 
> Hello, is possible to execute a SQL write without Transaction? we dont need 
> transactions to save our data and this adds an overhead to the SQLServer.
> 
> Regards.
> 
> -- 
> Ing. Ivaldi Andres


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.6.1 DataFrame write to JDBC

2016-04-21 Thread Michael Segel
How many partitions in your data set. 

Per the Spark DataFrameWritetr Java Doc:
“
Saves the content of the DataFrame 

 to a external database table via JDBC. In the case the table already exists in 
the external database, behavior of this function depends on the save mode, 
specified by the mode function (default to throwing an exception).
Don't create too many partitions in parallel on a large cluster; otherwise 
Spark might crash your external database systems.  

“

This implies one connection per partition writing in parallel. So you could be 
swamping your database. 
Which database are you using? 

Also, how many hops? 
Network latency could also impact performance too… 

> On Apr 19, 2016, at 3:14 PM, Jonathan Gray  wrote:
> 
> Hi,
> 
> I'm trying to write ~60 million rows from a DataFrame to a database using 
> JDBC using Spark 1.6.1, something similar to df.write().jdbc(...)
> 
> The write seems to not be performing well.  Profiling the application with a 
> master of local[*] it appears there is not much socket write activity and 
> also not much CPU.
> 
> I would expect there to be an almost continuous block of socket write 
> activity showing up somewhere in the profile.
> 
> I can see that the top hot method involves 
> apache.spark.unsafe.platform.CopyMemory all from calls within 
> JdbcUtils.savePartition(...).  However, the CPU doesn't seem particularly 
> stressed so I'm guessing this isn't the cause of the problem.
> 
> Is there any best practices or has anyone come across a case like this before 
> where a write to a database seems to perform poorly?
> 
> Thanks,
> Jon



Word2VecModel limitted to spark.akka.frameSize ?!

2016-04-21 Thread Stefan Falk

Hello!

I am experiencing an issue [1] with Word2VecModel#save. It appears to 
exceed spark.akka.frameSize (see stack trace [3]).


Setting the frameSize is not really an option because that would just 
limit me to 2GB so I wonder if there is anything I can do to make this 
work even if the model is larger.


Am I doing something wrong here or is this some kind of bug?

BR; Stefan

[1] 
http://stackoverflow.com/questions/36692386/exceeding-spark-akka-framesize-when-saving-word2vecmodel
[2] 
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala#L646

[3] http://pastebin.com/1gM63ynm

--
Stefan R. Falk

Know-Center Graz
Inffeldgasse 13 / 6. Stock
8010 Graz, Austria
Email : sf...@know-center.at
Tel: +43 316 873 30869
http://www.know-center.at


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to know whether I'm in the first batch of spark streaming

2016-04-21 Thread Praveen Devarao
Thanks Yu for sharing the use case.

>>If our system have some problem, such as hdfs issue, and the "first 
batch" and "second batch" were both queued. When the issue gone, these two 
batch will start together. Then, will onBatchStarted be called 
concurrently for these two batches?<<

Not sure...I have not digged in to that detail or faced a situation one 
such.I see a method onBatchSubmitted in the listener and the comment 
for the method reads "/** Called when a batch of jobs has been submitted 
for processing. */" Given that we have an event for batch submitted 
too...I think the case you mention is a possible scenarioso probably 
you can use this method in combination with the other two. As all the 
three methods take BatchInfo as their arguments and the BatchInfo class 
has the needed details of batchTime you should be able to achieve your 
task.


Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:   Yu Xie 
To: Praveen Devarao/India/IBM@IBMIN
Cc: user@spark.apache.org
Date:   21/04/2016 01:40 pm
Subject:Re: How to know whether I'm in the first batch of spark 
streaming



Thank you Praveen

  in our spark streaming, we write down the data to a HDFS directory, and 
use the MMDDHHHmm00 format of batch time as the directory name.
  So, when we stop the streaming and start the streaming again (we do not 
use checkpoint), in the init of the first batch, we will write down the 
empty directory between the stop and start.
  If the second batch runs faster than the first batch, and it will have 
the chance to run the "init". In this case, the directory that the "first 
batch" will output to will be set to an empty directory by the "second 
batch", it will make the data mess.

  I have a question about the StreamingListener.
  If our system have some problem, such as hdfs issue, and the "first 
batch" and "second batch" were both queued. When the issue gone, these two 
batch will start together. Then, will onBatchStarted be called 
concurrently for these two batches?

Thank you


On Thu, Apr 21, 2016 at 3:11 PM, Praveen Devarao  
wrote:
Hi Yu,

Could you provide more details on what and how are you trying to 
initialize.are you having this initialization as part of the code 
block in action of the DStream? Say if the second batch finishes before 
first batch wouldn't your results be affected as init would have not taken 
place (since you want it on first batch itself)?

One way we could think of knowing the first batch is by 
implementing the StreamingListenertrait which has a method onBatchStarted 
and onBatchCompleted...These methods should help you determine the first 
batch (definitely first batch will start first though order of ending is 
not guaranteed with concurrentJobs set to more than 1)...

Would be interesting to know your use case...could you share, if 
possible?

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:Yu Xie 
To:user@spark.apache.org
Date:19/04/2016 01:24 pm
Subject:How to know whether I'm in the first batch of spark 
streaming




hi spark users

I'm running a spark streaming application, with concurrentJobs > 1, so 
maybe more than one batches could run together.

Now I would like to do some init work in the first batch based on the 
"time" of the first batch. So even the second batch runs faster than the 
first batch, I still need to init in the literal "first batch"

Then is there a way that I can know that?
Thank you








Re: Impala can't read partitioned Parquet files saved from DF.partitionBy

2016-04-21 Thread Petr Novak
I have to ask my colleague if there is any specific error but I think it
just doesn't see files.

Petr

On Thu, Apr 21, 2016 at 11:54 AM, Petr Novak  wrote:

> Hello,
> Impala (v2.1.0, Spark 1.6.0) can't read partitioned Parquet files saved
> from DF.partitionBy (using Python). Is there any known reason, some config?
> Or it should generally work hence it is likely to be something wrong solely
> on our side?
>
> Many thanks,
> Petr
>


Impala can't read partitioned Parquet files saved from DF.partitionBy

2016-04-21 Thread Petr Novak
Hello,
Impala (v2.1.0, Spark 1.6.0) can't read partitioned Parquet files saved
from DF.partitionBy (using Python). Is there any known reason, some config?
Or it should generally work hence it is likely to be something wrong solely
on our side?

Many thanks,
Petr


Long(20+ seconds) startup delay for jobs when running Spark on YARN

2016-04-21 Thread Akmal Abbasov

Hi, I'm running Spark(1.6.1) on YARN(2.5.1), cluster mode.
It's taking 20+ seconds for application to move from ACCEPTED to RUNNING state, 
here's logs
16/04/21 09:06:56 INFO impl.YarnClientImpl: Submitted application 
application_1461229289298_0001
16/04/21 09:06:57 INFO yarn.Client: Application report for 
application_1461229289298_0001 (state: ACCEPTED)
16/04/21 09:06:57 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1461229616315
final status: UNDEFINED
tracking URL: http://test-spark-m2:8088/proxy/application_1461229289298_0001/
user: hadoop
16/04/21 09:06:58 INFO yarn.Client: Application report for 
application_1461229289298_0001 (state: ACCEPTED)
...
16/04/21 09:07:31 INFO yarn.Client: Application report for 
application_1461229289298_0001 (state: ACCEPTED)
16/04/21 09:07:32 INFO yarn.Client: Application report for 
application_1461229289298_0001 (state: RUNNING)
16/04/21 09:07:32 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 10.0.8.51
ApplicationMaster RPC port: 0
queue: default
start time: 1461229616315
final status: UNDEFINED
tracking URL: http://test-spark-m2:8088/proxy/application_1461229289298_0001/
user: hadoop
16/04/21 09:07:33 INFO yarn.Client: Application report for 
application_1461229289298_0001 (state: RUNNING)

And here is the ApplicationMaster logs
16/04/21 09:07:19 INFO yarn.ApplicationMaster: Registered signal handlers for 
[TERM, HUP, INT]
16/04/21 09:07:22 INFO yarn.ApplicationMaster: ApplicationAttemptId: 
appattempt_1461229289298_0001_01
16/04/21 09:07:24 INFO spark.SecurityManager: Changing view acls to: hadoop
16/04/21 09:07:24 INFO spark.SecurityManager: Changing modify acls to: hadoop
16/04/21 09:07:24 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
16/04/21 09:07:25 INFO yarn.ApplicationMaster: Starting the user application in 
a separate Thread
16/04/21 09:07:25 INFO yarn.ApplicationMaster: Waiting for spark context 
initialization
16/04/21 09:07:25 INFO yarn.ApplicationMaster: Waiting for spark context 
initialization ...
16/04/21 09:07:26 INFO spark.SparkContext: Running Spark version 1.6.1
16/04/21 09:07:26 INFO spark.SecurityManager: Changing view acls to: hadoop
16/04/21 09:07:26 INFO spark.SecurityManager: Changing modify acls to: hadoop
16/04/21 09:07:26 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
16/04/21 09:07:27 INFO util.Utils: Successfully started service 'sparkDriver' 
on port 57808.
16/04/21 09:07:28 INFO slf4j.Slf4jLogger: Slf4jLogger started
16/04/21 09:07:28 INFO Remoting: Starting remoting
16/04/21 09:07:28 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriverActorSystem@10.0.8.51:41651]
16/04/21 09:07:28 INFO util.Utils: Successfully started service 
'sparkDriverActorSystem' on port 41651.
16/04/21 09:07:29 INFO spark.SparkEnv: Registering MapOutputTracker
16/04/21 09:07:29 INFO spark.SparkEnv: Registering BlockManagerMaster
16/04/21 09:07:29 INFO storage.DiskBlockManager: Created local directory at 
/tmp/hadoop/nm-local-dir/usercache/hadoop/appcache/application_1461229289298_0001/blockmgr-84f8f2d5-e749-46d8-b605-2f615193421c
16/04/21 09:07:29 INFO storage.MemoryStore: MemoryStore started with capacity 
1140.4 MB
16/04/21 09:07:29 INFO spark.SparkEnv: Registering OutputCommitCoordinator
16/04/21 09:07:29 INFO ui.JettyUtils: Adding filter: 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
16/04/21 09:07:29 INFO server.Server: jetty-8.y.z-SNAPSHOT
16/04/21 09:07:30 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:56199
16/04/21 09:07:30 INFO util.Utils: Successfully started service 'SparkUI' on 
port 56199.
16/04/21 09:07:30 INFO ui.SparkUI: Started SparkUI at http://10.0.8.51:56199
16/04/21 09:07:30 INFO cluster.YarnClusterScheduler: Created 
YarnClusterScheduler
16/04/21 09:07:30 INFO util.Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 51641.
16/04/21 09:07:30 INFO netty.NettyBlockTransferService: Server created on 51641
16/04/21 09:07:30 INFO storage.BlockManagerMaster: Trying to register 
BlockManager
16/04/21 09:07:30 INFO storage.BlockManagerMasterEndpoint: Registering block 
manager 10.0.8.51:51641 with 1140.4 MB RAM, BlockManagerId(driver, 10.0.8.51, 
51641)
16/04/21 09:07:30 INFO storage.BlockManagerMaster: Registered BlockManager
16/04/21 09:07:31 INFO scheduler.EventLoggingListener: Logging events to 
hdfs:///spark-events/application_1461229289298_0001_1
16/04/21 09:07:31 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
ApplicationMaster registered as 
NettyRpcEndpointRef(spark://YarnAM@10.0.8.51:57808)
16/04/21 09:07:32 INFO 

Re: How to know whether I'm in the first batch of spark streaming

2016-04-21 Thread Yu Xie
Thank you Praveen

  in our spark streaming, we write down the data to a HDFS directory, and
use the MMDDHHHmm00 format of batch time as the directory name.
  So, when we stop the streaming and start the streaming again (we do not
use checkpoint), in the init of the first batch, we will write down the
empty directory between the stop and start.
  If the second batch runs faster than the first batch, and it will have
the chance to run the "init". In this case, the directory that the "first
batch" will output to will be set to an empty directory by the "second
batch", it will make the data mess.

  I have a question about the StreamingListener.
  If our system have some problem, such as hdfs issue, and the "first
batch" and "second batch" were both queued. When the issue gone, these two
batch will start together. Then, will onBatchStarted be called concurrently
for these two batches?

Thank you


On Thu, Apr 21, 2016 at 3:11 PM, Praveen Devarao 
wrote:

> Hi Yu,
>
> Could you provide more details on what and how are you trying to
> initialize.are you having this initialization as part of the code block
> in action of the DStream? Say if the second batch finishes before first
> batch wouldn't your results be affected as init would have not taken place
> (since you want it on first batch itself)?
>
> One way we could think of knowing the first batch is by
> implementing the *StreamingListener*trait which has a method *onBatchStarted
> *and *onBatchCompleted*...These methods should help you determine the
> first batch (definitely first batch will start first though order of ending
> is not guaranteed with concurrentJobs set to more than 1)...
>
> Would be interesting to know your use case...could you share, if
> possible?
>
> Thanking You
>
> -
> Praveen Devarao
> Spark Technology Centre
> IBM India Software Labs
>
> -
> "Courage doesn't always roar. Sometimes courage is the quiet voice at the
> end of the day saying I will try again"
>
>
>
> From:Yu Xie 
> To:user@spark.apache.org
> Date:19/04/2016 01:24 pm
> Subject:How to know whether I'm in the first batch of spark
> streaming
> --
>
>
>
> hi spark users
>
> I'm running a spark streaming application, with concurrentJobs > 1, so
> maybe more than one batches could run together.
>
> Now I would like to do some init work in the first batch based on the
> "time" of the first batch. So even the second batch runs faster than the
> first batch, I still need to init in the literal "first batch"
>
> Then is there a way that I can know that?
> Thank you
>
>
>


Re: How to know whether I'm in the first batch of spark streaming

2016-04-21 Thread Praveen Devarao
Hi Yu,

Could you provide more details on what and how are you trying to 
initialize.are you having this initialization as part of the code 
block in action of the DStream? Say if the second batch finishes before 
first batch wouldn't your results be affected as init would have not taken 
place (since you want it on first batch itself)?

One way we could think of knowing the first batch is by 
implementing the StreamingListener trait which has a method onBatchStarted 
and onBatchCompleted...These methods should help you determine the first 
batch (definitely first batch will start first though order of ending is 
not guaranteed with concurrentJobs set to more than 1)...

Would be interesting to know your use case...could you share, if 
possible?

Thanking You
-
Praveen Devarao
Spark Technology Centre
IBM India Software Labs
-
"Courage doesn't always roar. Sometimes courage is the quiet voice at the 
end of the day saying I will try again"



From:   Yu Xie 
To: user@spark.apache.org
Date:   19/04/2016 01:24 pm
Subject:How to know whether I'm in the first batch of spark 
streaming



hi spark users

I'm running a spark streaming application, with concurrentJobs > 1, so 
maybe more than one batches could run together.

Now I would like to do some init work in the first batch based on the 
"time" of the first batch. So even the second batch runs faster than the 
first batch, I still need to init in the literal "first batch"

Then is there a way that I can know that?
Thank you





Re: Spark 1.6.1 DataFrame write to JDBC

2016-04-21 Thread Mich Talebzadeh
What is the end database, Have you checked the performance of your query at
the target?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 19 April 2016 at 23:14, Jonathan Gray  wrote:

> Hi,
>
> I'm trying to write ~60 million rows from a DataFrame to a database using
> JDBC using Spark 1.6.1, something similar to df.write().jdbc(...)
>
> The write seems to not be performing well.  Profiling the application with
> a master of local[*] it appears there is not much socket write activity and
> also not much CPU.
>
> I would expect there to be an almost continuous block of socket write
> activity showing up somewhere in the profile.
>
> I can see that the top hot method involves
> apache.spark.unsafe.platform.CopyMemory all from calls within
> JdbcUtils.savePartition(...).  However, the CPU doesn't seem particularly
> stressed so I'm guessing this isn't the cause of the problem.
>
> Is there any best practices or has anyone come across a case like this
> before where a write to a database seems to perform poorly?
>
> Thanks,
> Jon
>


Re: Choosing an Algorithm in Spark MLib

2016-04-21 Thread Prashant Sharma
As far as I can understand, your requirements are pretty straight forward
and doable with just simple SQL queries. Take a look at Spark SQL on spark
documentation.

Prashant Sharma



On Tue, Apr 12, 2016 at 8:13 PM, Joe San  wrote:

> up vote
> down votefavorite
> 
>
> I'm working on a problem where in I have some data sets about some power
> generating units. Each of these units have been activated to run in the
> past and while activation, some units went into some issues. I now have all
> these data and I would like to come up with some sort of Ranking for these
> generating units. The criteria for ranking would be pretty simple to start
> with. They are:
>
>1. Maximum number of times a particular generating unit was activated
>2. How many times did the generating unit ran into problems during
>activation
>
> Later on I would expand on this ranking algorithm by adding more criteria.
> I will be using Apache Spark MLIB library and I can already see that there
> are quite a few algorithms already in place.
>
> http://spark.apache.org/docs/latest/mllib-guide.html
>
> I'm just not sure which algorithm would fit my purpose. Any suggestions?
>