Re: Using saveAsNewAPIHadoopDataset for Saving custom classes to Hbase

2016-04-22 Thread Ted Yu
Which hbase release are you using ?

Below is the write method from hbase 1.1 :

public void write(KEY key, Mutation value)
throws IOException {
  if (!(value instanceof Put) && !(value instanceof Delete)) {
throw new IOException("Pass a Delete or a Put");
  }
  mutator.mutate(value);

Mutation is an hbase class:

public abstract class Mutation extends OperationWithAttributes implements
Row, CellScannable,

HeapSize {

If you can show the skeleton of CustomClass, that would give us more clue.

>From the exception, looks like CustomClass doesn't extend Mutation.

A Mutation object can modify multiple columns.

Cheers

On Fri, Apr 22, 2016 at 8:14 PM, Nkechi Achara 
wrote:

> Hi All,
>
> I ma having a few issues saving my data to Hbase.
>
> I have created a pairRDD for my custom class using the following:
>
> val rdd1 =rdd.map{it=>
>   (getRowKey(it),
>   it)
> }
>
> val job = Job.getInstance(hConf)
> val jobConf = job.getConfiguration
> jobConf.set(TableOutputFormat.OUTPUT_TABLE, "tableName")
> job.setOutputFormatClass(classOf[TableOutputFormat[CustomClass]])
>
> rdd1.saveAsNewAPIHadoopDataset(jobConf)
>
> When I run it, I receive the error:
>
> ava.lang.ClassCastException: com.test.CustomClass cannot be cast to
> org.apache.hadoop.hbase.client.Mutation
> at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1036)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1205)
>
> Has anyone got a concrete example of how to use this function?
> Also, does anyone know what it will actually save to Hbase, will it just
> be a single column for the CustomClass?
>
> Thanks,
>
> Keech
>


Using saveAsNewAPIHadoopDataset for Saving custom classes to Hbase

2016-04-22 Thread Nkechi Achara
Hi All,

I ma having a few issues saving my data to Hbase.

I have created a pairRDD for my custom class using the following:

val rdd1 =rdd.map{it=>
  (getRowKey(it),
  it)
}

val job = Job.getInstance(hConf)
val jobConf = job.getConfiguration
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "tableName")
job.setOutputFormatClass(classOf[TableOutputFormat[CustomClass]])

rdd1.saveAsNewAPIHadoopDataset(jobConf)

When I run it, I receive the error:

ava.lang.ClassCastException: com.test.CustomClass cannot be cast to
org.apache.hadoop.hbase.client.Mutation
at
org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1036)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1205)

Has anyone got a concrete example of how to use this function?
Also, does anyone know what it will actually save to Hbase, will it just be
a single column for the CustomClass?

Thanks,

Keech


Re: executor delay in Spark

2016-04-22 Thread Raghava Mutharaju
Thank you. For now we plan to use spark-shell to submit jobs.

Regards,
Raghava.


On Fri, Apr 22, 2016 at 7:40 PM, Mike Hynes <91m...@gmail.com> wrote:

> Glad to hear that the problem was solvable! I have not seen delays of this
> type for later stages in jobs run by spark-submit, but I do not think it
> impossible if your stage has no lineage dependence on other RDDs.
>
> I'm CC'ing the dev list to report of other users observing load imbalance
> caused by unusual initial task scheduling. I don't know of ways to avoid
> this other than creating a dummy task to synchronize the executors, but
> hopefully someone from there can suggest other possibilities.
>
> Mike
> On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
> wrote:
>
>> Mike,
>>
>> It turns out the executor delay, as you mentioned, is the cause. After we
>> introduced a dummy stage, partitioning was working fine. Does this delay
>> happen during later stages as well? We noticed the same behavior
>> (partitioning happens on spark-shell but not through spark-submit) at a
>> later stage also.
>>
>> Apart from introducing a dummy stage or running it from spark-shell, is
>> there any other option to fix this?
>>
>> Regards,
>> Raghava.
>>
>>
>> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>>
>>> When submitting a job with spark-submit, I've observed delays (up to
>>> 1--2 seconds) for the executors to respond to the driver in order to
>>> receive tasks in the first stage. The delay does not persist once the
>>> executors have been synchronized.
>>>
>>> When the tasks are very short, as may be your case (relatively small
>>> data and a simple map task like you have described), the 8 tasks in
>>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>>> the second executor won't have responded to the master before the
>>> first 4 tasks on the first executor have completed.
>>>
>>> To see if this is the cause in your particular case, you could try the
>>> following to confirm:
>>> 1. Examine the starting times of the tasks alongside their
>>> executor
>>> 2. Make a "dummy" stage execute before your real stages to
>>> synchronize the executors by creating and materializing any random RDD
>>> 3. Make the tasks longer, i.e. with some silly computational
>>> work.
>>>
>>> Mike
>>>
>>>
>>> On 4/17/16, Raghava Mutharaju  wrote:
>>> > Yes its the same data.
>>> >
>>> > 1) The number of partitions are the same (8, which is an argument to
>>> the
>>> > HashPartitioner). In the first case, these partitions are spread across
>>> > both the worker nodes. In the second case, all the partitions are on
>>> the
>>> > same node.
>>> > 2) What resources would be of interest here? Scala shell takes the
>>> default
>>> > parameters since we use "bin/spark-shell --master " to run
>>> the
>>> > scala-shell. For the scala program, we do set some configuration
>>> options
>>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>>> > serializer.
>>> >
>>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>>> > RAM.1 executor runs on each worker node. Following configuration
>>> options
>>> > are set for the scala program -- perhaps we should move it to the spark
>>> > config file.
>>> >
>>> > Driver memory and executor memory are set to 12GB
>>> > parallelism is set to 8
>>> > Kryo serializer is used
>>> > Number of retainedJobs and retainedStages has been increased to check
>>> them
>>> > in the UI.
>>> >
>>> > What information regarding Spark Context would be of interest here?
>>> >
>>> > Regards,
>>> > Raghava.
>>> >
>>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>>> wrote:
>>> >
>>> >> If the data file is same then it should have similar distribution of
>>> >> keys.
>>> >> Few queries-
>>> >>
>>> >> 1. Did you compare the number of partitions in both the cases?
>>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>>> >> Program being submitted?
>>> >>
>>> >> Also, can you please share the details of Spark Context, Environment
>>> and
>>> >> Executors when you run via Scala program?
>>> >>
>>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>>> >> m.vijayaragh...@gmail.com> wrote:
>>> >>
>>> >>> Hello All,
>>> >>>
>>> >>> We are using HashPartitioner in the following way on a 3 node
>>> cluster (1
>>> >>> master and 2 worker nodes).
>>> >>>
>>> >>> val u =
>>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>>> (y.toInt,
>>> >>> x.toInt) } }).partitionBy(new
>>> HashPartitioner(8)).setName("u").persist()
>>> >>>
>>> >>> u.count()
>>> >>>
>>> >>> If we run this from the spark shell, the data (52 MB) is split across
>>> >>> the
>>> >>> two worker nodes. But if we put this in a scala program and run it,
>>> then
>>> >>> all the data goes to only one node. We have run it 

RE: Java exception when showing join

2016-04-22 Thread Yong Zhang
use "dispute_df.join(comments_df, dispute_df.COMMENTID === 
comments_df.COMMENTID).first()" instead.
Yong

Date: Fri, 22 Apr 2016 17:42:26 -0400
From: webe...@aim.com
To: user@spark.apache.org
Subject: Java exception when showing join

I am using pyspark with netezza.  I am getting a java exception when trying to 
show the first row of a join.  I can show the first row for of the two 
dataframes separately but not the result of a join.  I get the same error for 
any action I take(first, collect, show).  Am I doing something wrong?



from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

dispute_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://***:5480/db',
 user='***', password='***', dbtable='table1', 
driver='com.ibm.spark.netezza').load()

dispute_df.printSchema()

comments_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://***:5480/db',
 user='***', password='***', dbtable='table2', 
driver='com.ibm.spark.netezza').load()

comments_df.printSchema()

dispute_df.join(comments_df, dispute_df.COMMENTID == 
comments_df.COMMENTID).first()





root

 |-- COMMENTID: string (nullable = true)

 |-- EXPORTDATETIME: timestamp (nullable = true)

 |-- ARTAGS: string (nullable = true)

 |-- POTAGS: string (nullable = true)

 |-- INVTAG: string (nullable = true)

 |-- ACTIONTAG: string (nullable = true)

 |-- DISPUTEFLAG: string (nullable = true)

 |-- ACTIONFLAG: string (nullable = true)

 |-- CUSTOMFLAG1: string (nullable = true)

 |-- CUSTOMFLAG2: string (nullable = true)



root

 |-- COUNTRY: string (nullable = true)

 |-- CUSTOMER: string (nullable = true)

 |-- INVNUMBER: string (nullable = true)

 |-- INVSEQNUMBER: string (nullable = true)

 |-- LEDGERCODE: string (nullable = true)

 |-- COMMENTTEXT: string (nullable = true)

 |-- COMMENTTIMESTAMP: timestamp (nullable = true)

 |-- COMMENTLENGTH: long (nullable = true)

 |-- FREEINDEX: long (nullable = true)

 |-- COMPLETEDFLAG: long (nullable = true)

 |-- ACTIONFLAG: long (nullable = true)

 |-- FREETEXT: string (nullable = true)

 |-- USERNAME: string (nullable = true)

 |-- ACTION: string (nullable = true)

 |-- COMMENTID: string (nullable = true)



---

Py4JJavaError Traceback (most recent call last)

 in ()

  5 comments_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
 user='rnahar', password='Sfeb2016', dbtable='UK_METRICS.EU_COMMENTS2', 
driver='com.ibm.spark.netezza').load()

  6 comments_df.printSchema()

> 7 dispute_df.join(comments_df, dispute_df.COMMENTID == 
comments_df.COMMENTID).first()



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in first(self)

802 Row(age=2, name=u'Alice')

803 """

--> 804 return self.head()

805 

806 @ignore_unicode_prefix



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in head(self, n)

790 """

791 if n is None:

--> 792 rs = self.head(1)

793 return rs[0] if rs else None

794 return self.take(n)



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in head(self, n)

792 rs = self.head(1)

793 return rs[0] if rs else None

--> 794 return self.take(n)

795 

796 @ignore_unicode_prefix



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in take(self, num)

304 with SCCallSiteSync(self._sc) as css:

305 port = 
self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(

--> 306 self._jdf, num)

307 return list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(

308 



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)

811 answer = self.gateway_client.send_command(command)

812 return_value = get_return_value(

--> 813 answer, self.gateway_client, self.target_id, self.name)

814 

815 for temp_arg in temp_args:



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.pyc in 
deco(*a, **kw)

 43 def deco(*a, **kw):

 44 try:

---> 45 return f(*a, **kw)

 46 except py4j.protocol.Py4JJavaError as e:

 47 s = e.java_exception.toString()



/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)

306 raise Py4JJavaError(

307 "An error occurred while calling {0}{1}{2}.\n".

--> 308 format(target_id, ".", name), value)


Re: executor delay in Spark

2016-04-22 Thread Mike Hynes
Glad to hear that the problem was solvable! I have not seen delays of this
type for later stages in jobs run by spark-submit, but I do not think it
impossible if your stage has no lineage dependence on other RDDs.

I'm CC'ing the dev list to report of other users observing load imbalance
caused by unusual initial task scheduling. I don't know of ways to avoid
this other than creating a dummy task to synchronize the executors, but
hopefully someone from there can suggest other possibilities.

Mike
On Apr 23, 2016 5:53 AM, "Raghava Mutharaju" 
wrote:

> Mike,
>
> It turns out the executor delay, as you mentioned, is the cause. After we
> introduced a dummy stage, partitioning was working fine. Does this delay
> happen during later stages as well? We noticed the same behavior
> (partitioning happens on spark-shell but not through spark-submit) at a
> later stage also.
>
> Apart from introducing a dummy stage or running it from spark-shell, is
> there any other option to fix this?
>
> Regards,
> Raghava.
>
>
> On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> When submitting a job with spark-submit, I've observed delays (up to
>> 1--2 seconds) for the executors to respond to the driver in order to
>> receive tasks in the first stage. The delay does not persist once the
>> executors have been synchronized.
>>
>> When the tasks are very short, as may be your case (relatively small
>> data and a simple map task like you have described), the 8 tasks in
>> your stage may be allocated to only 1 executor in 2 waves of 4, since
>> the second executor won't have responded to the master before the
>> first 4 tasks on the first executor have completed.
>>
>> To see if this is the cause in your particular case, you could try the
>> following to confirm:
>> 1. Examine the starting times of the tasks alongside their
>> executor
>> 2. Make a "dummy" stage execute before your real stages to
>> synchronize the executors by creating and materializing any random RDD
>> 3. Make the tasks longer, i.e. with some silly computational work.
>>
>> Mike
>>
>>
>> On 4/17/16, Raghava Mutharaju  wrote:
>> > Yes its the same data.
>> >
>> > 1) The number of partitions are the same (8, which is an argument to the
>> > HashPartitioner). In the first case, these partitions are spread across
>> > both the worker nodes. In the second case, all the partitions are on the
>> > same node.
>> > 2) What resources would be of interest here? Scala shell takes the
>> default
>> > parameters since we use "bin/spark-shell --master " to run
>> the
>> > scala-shell. For the scala program, we do set some configuration options
>> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
>> > serializer.
>> >
>> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
>> > RAM.1 executor runs on each worker node. Following configuration options
>> > are set for the scala program -- perhaps we should move it to the spark
>> > config file.
>> >
>> > Driver memory and executor memory are set to 12GB
>> > parallelism is set to 8
>> > Kryo serializer is used
>> > Number of retainedJobs and retainedStages has been increased to check
>> them
>> > in the UI.
>> >
>> > What information regarding Spark Context would be of interest here?
>> >
>> > Regards,
>> > Raghava.
>> >
>> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar 
>> wrote:
>> >
>> >> If the data file is same then it should have similar distribution of
>> >> keys.
>> >> Few queries-
>> >>
>> >> 1. Did you compare the number of partitions in both the cases?
>> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
>> >> Program being submitted?
>> >>
>> >> Also, can you please share the details of Spark Context, Environment
>> and
>> >> Executors when you run via Scala program?
>> >>
>> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
>> >> m.vijayaragh...@gmail.com> wrote:
>> >>
>> >>> Hello All,
>> >>>
>> >>> We are using HashPartitioner in the following way on a 3 node cluster
>> (1
>> >>> master and 2 worker nodes).
>> >>>
>> >>> val u =
>> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
>> >>> Int)](line => { line.split("\\|") match { case Array(x, y) =>
>> (y.toInt,
>> >>> x.toInt) } }).partitionBy(new
>> HashPartitioner(8)).setName("u").persist()
>> >>>
>> >>> u.count()
>> >>>
>> >>> If we run this from the spark shell, the data (52 MB) is split across
>> >>> the
>> >>> two worker nodes. But if we put this in a scala program and run it,
>> then
>> >>> all the data goes to only one node. We have run it multiple times, but
>> >>> this
>> >>> behavior does not change. This seems strange.
>> >>>
>> >>> Is there some problem with the way we use HashPartitioner?
>> >>>
>> >>> Thanks in advance.
>> >>>
>> >>> Regards,
>> >>> Raghava.
>> >>>
>> >>
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > Raghava
>> > 

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
So is there anyway of creating an rdd without using offsetRanges? Sorry for
lack of clarity here


val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
StringDecoder](sc, kafkaParams, offsetRanges)



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 23 April 2016 at 00:13, Mich Talebzadeh 
wrote:

> So there is really no point in using it :(
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 23 April 2016 at 00:11, Ted Yu  wrote:
>
>> The class is private :
>>
>> final class OffsetRange private(
>>
>> On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Ok I decided to forgo that approach and use an existing program of mine
>>> with slight modification. The code is this
>>>
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.functions._
>>> import _root_.kafka.serializer.StringDecoder
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>> import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
>>> //
>>> object CEP_assembly {
>>>   def main(args: Array[String]) {
>>>   val conf = new SparkConf().
>>>setAppName("CEP_assembly").
>>>setMaster("local[2]").
>>>set("spark.driver.allowMultipleContexts", "true").
>>>set("spark.hadoop.validateOutputSpecs", "false")
>>>   val sc = new SparkContext(conf)
>>>   // Create sqlContext based on HiveContext
>>>   val sqlContext = new HiveContext(sc)
>>>   import sqlContext.implicits._
>>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>   println ("\nStarted at"); sqlContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> val ssc = new StreamingContext(conf, Seconds(1))
>>> ssc.checkpoint("checkpoint")
>>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
>>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>>> val topics = Set("newtopic", "newtopic")
>>> val dstream = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>>> dstream.cache()
>>> val lines = dstream.map(_._2)
>>> val showResults = lines.filter(_.contains("statement
>>> cache")).flatMap(line => line.split("\n,")).map(word => (word,
>>> 1)).reduceByKey(_ + _)
>>> // Define the offset ranges to read in the batch job
>>> val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>>> // Create the RDD based on the offset ranges
>>> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
>>> StringDecoder](sc, kafkaParams, offsetRanges)
>>> ssc.start()
>>> ssc.awaitTermination()
>>> //ssc.stop()
>>>   println ("\nFinished at"); sqlContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>>   }
>>> }
>>>
>>>
>>> With sbt
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>>> "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>>> "provided"
>>> libraryDependencies += "junit" % "junit" % "4.12"
>>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>>> % "provided"
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>>> "1.6.1"
>>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>>> % "test"
>>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" %
>>> "1.6.1"
>>>
>>>
>>> However, I an getting the following error
>>>
>>> [info] Loading project definition from
>>> /data6/hduser/scala/CEP_assembly/project
>>> [info] Set current project to CEP_assembly (in build
>>> file:/data6/hduser/scala/CEP_assembly/)
>>> [info] Compiling 1 Scala source to
>>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>>> [error]
>>> 

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
So there is really no point in using it :(

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 23 April 2016 at 00:11, Ted Yu  wrote:

> The class is private :
>
> final class OffsetRange private(
>
> On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Ok I decided to forgo that approach and use an existing program of mine
>> with slight modification. The code is this
>>
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkConf
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.hive.HiveContext
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.SQLContext
>> import org.apache.spark.sql.functions._
>> import _root_.kafka.serializer.StringDecoder
>> import org.apache.spark.streaming._
>> import org.apache.spark.streaming.kafka.KafkaUtils
>> import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
>> //
>> object CEP_assembly {
>>   def main(args: Array[String]) {
>>   val conf = new SparkConf().
>>setAppName("CEP_assembly").
>>setMaster("local[2]").
>>set("spark.driver.allowMultipleContexts", "true").
>>set("spark.hadoop.validateOutputSpecs", "false")
>>   val sc = new SparkContext(conf)
>>   // Create sqlContext based on HiveContext
>>   val sqlContext = new HiveContext(sc)
>>   import sqlContext.implicits._
>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>   println ("\nStarted at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>> val ssc = new StreamingContext(conf, Seconds(1))
>> ssc.checkpoint("checkpoint")
>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
>> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
>> val topics = Set("newtopic", "newtopic")
>> val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topics)
>> dstream.cache()
>> val lines = dstream.map(_._2)
>> val showResults = lines.filter(_.contains("statement
>> cache")).flatMap(line => line.split("\n,")).map(word => (word,
>> 1)).reduceByKey(_ + _)
>> // Define the offset ranges to read in the batch job
>> val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>> // Create the RDD based on the offset ranges
>> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
>> StringDecoder](sc, kafkaParams, offsetRanges)
>> ssc.start()
>> ssc.awaitTermination()
>> //ssc.stop()
>>   println ("\nFinished at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>>   }
>> }
>>
>>
>> With sbt
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>> "provided"
>> libraryDependencies += "junit" % "junit" % "4.12"
>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>> % "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.6.1"
>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" %
>> "test"
>> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" %
>> "1.6.1"
>>
>>
>> However, I an getting the following error
>>
>> [info] Loading project definition from
>> /data6/hduser/scala/CEP_assembly/project
>> [info] Set current project to CEP_assembly (in build
>> file:/data6/hduser/scala/CEP_assembly/)
>> [info] Compiling 1 Scala source to
>> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
>> constructor OffsetRange in class OffsetRange cannot be accessed in object
>> CEP_assembly
>> [error] val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 22 April 2016 at 18:41, Marcelo Vanzin  wrote:
>>
>>> On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh
>>>  wrote:
>>> > I am trying 

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Ted Yu
The class is private :

final class OffsetRange private(

On Fri, Apr 22, 2016 at 4:08 PM, Mich Talebzadeh 
wrote:

> Ok I decided to forgo that approach and use an existing program of mine
> with slight modification. The code is this
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions._
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
> //
> object CEP_assembly {
>   def main(args: Array[String]) {
>   val conf = new SparkConf().
>setAppName("CEP_assembly").
>setMaster("local[2]").
>set("spark.driver.allowMultipleContexts", "true").
>set("spark.hadoop.validateOutputSpecs", "false")
>   val sc = new SparkContext(conf)
>   // Create sqlContext based on HiveContext
>   val sqlContext = new HiveContext(sc)
>   import sqlContext.implicits._
>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val ssc = new StreamingContext(conf, Seconds(1))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
> val topics = Set("newtopic", "newtopic")
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topics)
> dstream.cache()
> val lines = dstream.map(_._2)
> val showResults = lines.filter(_.contains("statement cache")).flatMap(line
> => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
> // Define the offset ranges to read in the batch job
> val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
> // Create the RDD based on the offset ranges
> val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
> StringDecoder](sc, kafkaParams, offsetRanges)
> ssc.start()
> ssc.awaitTermination()
> //ssc.stop()
>   println ("\nFinished at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
>   }
> }
>
>
> With sbt
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
> "provided"
> libraryDependencies += "junit" % "junit" % "4.12"
> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1"
> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" %
> "test"
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" %
> "1.6.1"
>
>
> However, I an getting the following error
>
> [info] Loading project definition from
> /data6/hduser/scala/CEP_assembly/project
> [info] Set current project to CEP_assembly (in build
> file:/data6/hduser/scala/CEP_assembly/)
> [info] Compiling 1 Scala source to
> /data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
> constructor OffsetRange in class OffsetRange cannot be accessed in object
> CEP_assembly
> [error] val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 April 2016 at 18:41, Marcelo Vanzin  wrote:
>
>> On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh
>>  wrote:
>> > I am trying to test Spark with CEP and I have been shown a sample here
>> >
>> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>>
>> I'm not familiar with CEP, but that's a Spark unit test, so if you're
>> trying to run it outside of the context of Spark unit tests (as it
>> seems you're trying to do), you're going to run into a world of
>> trouble. I'd suggest a different approach where 

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
Ok I decided to forgo that approach and use an existing program of mine
with slight modification. The code is this

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
//
object CEP_assembly {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("CEP_assembly").
   setMaster("local[2]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topics = Set("newtopic", "newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()
val lines = dstream.map(_._2)
val showResults = lines.filter(_.contains("statement cache")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
// Define the offset ranges to read in the batch job
val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)
// Create the RDD based on the offset ranges
val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
StringDecoder](sc, kafkaParams, offsetRanges)
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
  }
}


With sbt

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
"provided"
libraryDependencies += "junit" % "junit" % "4.12"
libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.6.1"
libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" %
"test"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1"


However, I an getting the following error

[info] Loading project definition from
/data6/hduser/scala/CEP_assembly/project
[info] Set current project to CEP_assembly (in build
file:/data6/hduser/scala/CEP_assembly/)
[info] Compiling 1 Scala source to
/data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
constructor OffsetRange in class OffsetRange cannot be accessed in object
CEP_assembly
[error] val offsetRanges = new OffsetRange("newtopic", 0, 110, 220)


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 18:41, Marcelo Vanzin  wrote:

> On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh
>  wrote:
> > I am trying to test Spark with CEP and I have been shown a sample here
> >
> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532
>
> I'm not familiar with CEP, but that's a Spark unit test, so if you're
> trying to run it outside of the context of Spark unit tests (as it
> seems you're trying to do), you're going to run into a world of
> trouble. I'd suggest a different approach where whatever you're trying
> to do is done through the Spark build, not outside of it.
>
> --
> Marcelo
>


Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-22 Thread Mich Talebzadeh
Hi Cody,

This is my first attempt on using offset ranges (this may not mean much in
my context at the moment)

val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topics = Set("newtopic", "newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()
val lines = dstream.map(_._2)
val showResults = lines.filter(_.contains("statement cache")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _)
// Define the offset ranges to read in the batch job. Just one offset range
val offsetRanges = Array(
  OffsetRange("newtopic", 0, 110, 220)
)
// Create the RDD based on the offset ranges
val rdd = KafkaUtils.createRDD[String, String, StringDecoder,
StringDecoder](sc, kafkaParams, offsetRanges)


This comes back with error

[info] Compiling 1 Scala source to
/data6/hduser/scala/CEP_assembly/target/scala-2.10/classes...
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:37:
not found: value OffsetRange
[error]   OffsetRange("newtopic", 0, 110, 220),
[error]   ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

Any ideas will be appreciated


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 22:04, Cody Koeninger  wrote:

> Spark streaming as it exists today is always microbatch.
>
> You can certainly filter messages using spark streaming.
>
>
> On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh
>  wrote:
> > yep actually using createDirectStream sounds a better way of doing it.
> Am I
> > correct that createDirectStream was introduced to overcome micro-batching
> > limitations?
> >
> > In a nutshell I want to pickup all the messages and keep signal
> according to
> > pre-built criteria (say indicating a buy signal) and ignore the pedestals
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 22 April 2016 at 21:56, Cody Koeninger  wrote:
> >>
> >> You can still do sliding windows with createDirectStream, just do your
> >> map / extraction of fields before the window.
> >>
> >> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
> >>  wrote:
> >> > Hi Cody,
> >> >
> >> > I want to use sliding windows for Complex Event Processing
> >> > micro-batching
> >> >
> >> > Dr Mich Talebzadeh
> >> >
> >> >
> >> >
> >> > LinkedIn
> >> >
> >> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >
> >> >
> >> >
> >> > http://talebzadehmich.wordpress.com
> >> >
> >> >
> >> >
> >> >
> >> > On 22 April 2016 at 21:51, Cody Koeninger  wrote:
> >> >>
> >> >> Why are you wanting to convert?
> >> >>
> >> >> As far as doing the conversion, createStream doesn't take the same
> >> >> arguments, look at the docs.
> >> >>
> >> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
> >> >>  wrote:
> >> >> > Hi,
> >> >> >
> >> >> > What is the best way of converting this program of that uses
> >> >> > KafkaUtils.createDirectStream to Sliding window using
> >> >> >
> >> >> > val dstream = KafkaUtils.createDirectStream[String, String,
> >> >> > StringDecoder,
> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >
> >> >> > to
> >> >> >
> >> >> > val dstream = KafkaUtils.createStream[String, String,
> StringDecoder,
> >> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >> >
> >> >> >
> >> >> > The program below works
> >> >> >
> >> >> >
> >> >> > import org.apache.spark.SparkContext
> >> >> > import org.apache.spark.SparkConf
> >> >> > import org.apache.spark.sql.Row
> >> >> > import org.apache.spark.sql.hive.HiveContext
> >> >> > import org.apache.spark.sql.types._
> >> >> > import org.apache.spark.sql.SQLContext
> >> >> > import org.apache.spark.sql.functions._
> >> >> > import _root_.kafka.serializer.StringDecoder
> >> >> > import org.apache.spark.streaming._
> >> >> > import org.apache.spark.streaming.kafka.KafkaUtils
> >> >> > //
> >> >> > object CEP_assembly {
> >> >> >   def main(args: Array[String]) {
> >> >> >   val conf = new SparkConf().
> >> >> >setAppName("CEP_assembly").
> >> >> >setMaster("local[2]").
> >> >> >set("spark.driver.allowMultipleContexts", "true").
> >> >> >set("spark.hadoop.validateOutputSpecs", "false")
> >> >> >   val 

Java exception when showing join

2016-04-22 Thread webe3vt
I am using pyspark with netezza.  I am getting a java exception when trying to 
show the first row of a join.  I can show the first row for of the two 
dataframes separately but not the result of a join.  I get the same error for 
any action I take(first, collect, show).  Am I doing something wrong?

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
dispute_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://***:5480/db',
 user='***', password='***', dbtable='table1', 
driver='com.ibm.spark.netezza').load()
dispute_df.printSchema()
comments_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://***:5480/db',
 user='***', password='***', dbtable='table2', 
driver='com.ibm.spark.netezza').load()
comments_df.printSchema()
dispute_df.join(comments_df, dispute_df.COMMENTID == 
comments_df.COMMENTID).first()


root
 |-- COMMENTID: string (nullable = true)
 |-- EXPORTDATETIME: timestamp (nullable = true)
 |-- ARTAGS: string (nullable = true)
 |-- POTAGS: string (nullable = true)
 |-- INVTAG: string (nullable = true)
 |-- ACTIONTAG: string (nullable = true)
 |-- DISPUTEFLAG: string (nullable = true)
 |-- ACTIONFLAG: string (nullable = true)
 |-- CUSTOMFLAG1: string (nullable = true)
 |-- CUSTOMFLAG2: string (nullable = true)

root
 |-- COUNTRY: string (nullable = true)
 |-- CUSTOMER: string (nullable = true)
 |-- INVNUMBER: string (nullable = true)
 |-- INVSEQNUMBER: string (nullable = true)
 |-- LEDGERCODE: string (nullable = true)
 |-- COMMENTTEXT: string (nullable = true)
 |-- COMMENTTIMESTAMP: timestamp (nullable = true)
 |-- COMMENTLENGTH: long (nullable = true)
 |-- FREEINDEX: long (nullable = true)
 |-- COMPLETEDFLAG: long (nullable = true)
 |-- ACTIONFLAG: long (nullable = true)
 |-- FREETEXT: string (nullable = true)
 |-- USERNAME: string (nullable = true)
 |-- ACTION: string (nullable = true)
 |-- COMMENTID: string (nullable = true)

---
Py4JJavaError Traceback (most recent call last)
 in ()
  5 comments_df = 
sqlContext.read.format('com.ibm.spark.netezza').options(url='jdbc:netezza://dstbld-pda02.bld.dst.ibm.com:5480/BACC_DEV_CSP_NBAAR',
 user='rnahar', password='Sfeb2016', dbtable='UK_METRICS.EU_COMMENTS2', 
driver='com.ibm.spark.netezza').load()
  6 comments_df.printSchema()
> 7 dispute_df.join(comments_df, dispute_df.COMMENTID == 
comments_df.COMMENTID).first()

/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in first(self)
802 Row(age=2, name=u'Alice')
803 """
--> 804 return self.head()
805 
806 @ignore_unicode_prefix

/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in head(self, n)
790 """
791 if n is None:
--> 792 rs = self.head(1)
793 return rs[0] if rs else None
794 return self.take(n)

/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in head(self, n)
792 rs = self.head(1)
793 return rs[0] if rs else None
--> 794 return self.take(n)
795 
796 @ignore_unicode_prefix

/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/dataframe.pyc 
in take(self, num)
304 with SCCallSiteSync(self._sc) as css:
305 port = 
self._sc._jvm.org.apache.spark.sql.execution.EvaluatePython.takeAndServe(
--> 306 self._jdf, num)
307 return list(_load_from_socket(port, 
BatchedSerializer(PickleSerializer(
308 

/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
 in __call__(self, *args)
811 answer = self.gateway_client.send_command(command)
812 return_value = get_return_value(
--> 813 answer, self.gateway_client, self.target_id, self.name)
814 
815 for temp_arg in temp_args:

/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/pyspark/sql/utils.pyc in 
deco(*a, **kw)
 43 def deco(*a, **kw):
 44 try:
---> 45 return f(*a, **kw)
 46 except py4j.protocol.Py4JJavaError as e:
 47 s = e.java_exception.toString()

/usr/local/src/spark/spark-1.6.1-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
306 raise Py4JJavaError(
307 "An error occurred while calling {0}{1}{2}.\n".
--> 308 format(target_id, ".", name), value)
309 else:
310 raise Py4JError(

Py4JJavaError: An error occurred while calling 
z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in 
stage 59.0 failed 1 times, most recent failure: Lost task 2.0 in stage 

Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-22 Thread Cody Koeninger
Spark streaming as it exists today is always microbatch.

You can certainly filter messages using spark streaming.


On Fri, Apr 22, 2016 at 4:02 PM, Mich Talebzadeh
 wrote:
> yep actually using createDirectStream sounds a better way of doing it. Am I
> correct that createDirectStream was introduced to overcome micro-batching
> limitations?
>
> In a nutshell I want to pickup all the messages and keep signal according to
> pre-built criteria (say indicating a buy signal) and ignore the pedestals
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 22 April 2016 at 21:56, Cody Koeninger  wrote:
>>
>> You can still do sliding windows with createDirectStream, just do your
>> map / extraction of fields before the window.
>>
>> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
>>  wrote:
>> > Hi Cody,
>> >
>> > I want to use sliding windows for Complex Event Processing
>> > micro-batching
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>> >
>> >
>> > On 22 April 2016 at 21:51, Cody Koeninger  wrote:
>> >>
>> >> Why are you wanting to convert?
>> >>
>> >> As far as doing the conversion, createStream doesn't take the same
>> >> arguments, look at the docs.
>> >>
>> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
>> >>  wrote:
>> >> > Hi,
>> >> >
>> >> > What is the best way of converting this program of that uses
>> >> > KafkaUtils.createDirectStream to Sliding window using
>> >> >
>> >> > val dstream = KafkaUtils.createDirectStream[String, String,
>> >> > StringDecoder,
>> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >
>> >> > to
>> >> >
>> >> > val dstream = KafkaUtils.createStream[String, String, StringDecoder,
>> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> >
>> >> >
>> >> > The program below works
>> >> >
>> >> >
>> >> > import org.apache.spark.SparkContext
>> >> > import org.apache.spark.SparkConf
>> >> > import org.apache.spark.sql.Row
>> >> > import org.apache.spark.sql.hive.HiveContext
>> >> > import org.apache.spark.sql.types._
>> >> > import org.apache.spark.sql.SQLContext
>> >> > import org.apache.spark.sql.functions._
>> >> > import _root_.kafka.serializer.StringDecoder
>> >> > import org.apache.spark.streaming._
>> >> > import org.apache.spark.streaming.kafka.KafkaUtils
>> >> > //
>> >> > object CEP_assembly {
>> >> >   def main(args: Array[String]) {
>> >> >   val conf = new SparkConf().
>> >> >setAppName("CEP_assembly").
>> >> >setMaster("local[2]").
>> >> >set("spark.driver.allowMultipleContexts", "true").
>> >> >set("spark.hadoop.validateOutputSpecs", "false")
>> >> >   val sc = new SparkContext(conf)
>> >> >   // Create sqlContext based on HiveContext
>> >> >   val sqlContext = new HiveContext(sc)
>> >> >   import sqlContext.implicits._
>> >> >   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> >> >   println ("\nStarted at"); sqlContext.sql("SELECT
>> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> >> > ").collect.foreach(println)
>> >> > val ssc = new StreamingContext(conf, Seconds(1))
>> >> > ssc.checkpoint("checkpoint")
>> >> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> >> > "rhes564:9092",
>> >> > "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect"
>> >> > ->
>> >> > "rhes564:2181", "group.id" -> "StreamTest" )
>> >> > val topic = Set("newtopic")
>> >> > //val dstream = KafkaUtils.createStream[String, String,
>> >> > StringDecoder,
>> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> > val dstream = KafkaUtils.createDirectStream[String, String,
>> >> > StringDecoder,
>> >> > StringDecoder](ssc, kafkaParams, topic)
>> >> > dstream.cache()
>> >> > //val windowed_dstream = dstream.window(new
>> >> > Duration(sliding_window_length),
>> >> > new Duration(sliding_window_interval))
>> >> > dstream.print(1000)
>> >> > val lines = dstream.map(_._2)
>> >> > // Check for message
>> >> > val showResults = lines.filter(_.contains("Sending
>> >> > dstream")).flatMap(line
>> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> >> > _).print(1000)
>> >> > // Check for statement cache
>> >> > val showResults2 = lines.filter(_.contains("statement
>> >> > cache")).flatMap(line
>> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> >> > _).print(1000)
>> >> > ssc.start()
>> >> > ssc.awaitTermination()
>> >> > //ssc.stop()
>> >> >   println ("\nFinished at"); sqlContext.sql("SELECT
>> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> >> > ").collect.foreach(println)
>> >> >   }
>> >> > }
>> >> 

Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-22 Thread Mich Talebzadeh
yep actually using createDirectStream sounds a better way of doing it. Am I
correct that createDirectStream was introduced to overcome micro-batching
limitations?

In a nutshell I want to pickup all the messages and keep signal according
to pre-built criteria (say indicating a* buy signal*) and ignore the
pedestals

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 21:56, Cody Koeninger  wrote:

> You can still do sliding windows with createDirectStream, just do your
> map / extraction of fields before the window.
>
> On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
>  wrote:
> > Hi Cody,
> >
> > I want to use sliding windows for Complex Event Processing micro-batching
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> >
> >
> > On 22 April 2016 at 21:51, Cody Koeninger  wrote:
> >>
> >> Why are you wanting to convert?
> >>
> >> As far as doing the conversion, createStream doesn't take the same
> >> arguments, look at the docs.
> >>
> >> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
> >>  wrote:
> >> > Hi,
> >> >
> >> > What is the best way of converting this program of that uses
> >> > KafkaUtils.createDirectStream to Sliding window using
> >> >
> >> > val dstream = KafkaUtils.createDirectStream[String, String,
> >> > StringDecoder,
> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >
> >> > to
> >> >
> >> > val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> >> > StringDecoder](ssc, kafkaParams, topic)
> >> >
> >> >
> >> > The program below works
> >> >
> >> >
> >> > import org.apache.spark.SparkContext
> >> > import org.apache.spark.SparkConf
> >> > import org.apache.spark.sql.Row
> >> > import org.apache.spark.sql.hive.HiveContext
> >> > import org.apache.spark.sql.types._
> >> > import org.apache.spark.sql.SQLContext
> >> > import org.apache.spark.sql.functions._
> >> > import _root_.kafka.serializer.StringDecoder
> >> > import org.apache.spark.streaming._
> >> > import org.apache.spark.streaming.kafka.KafkaUtils
> >> > //
> >> > object CEP_assembly {
> >> >   def main(args: Array[String]) {
> >> >   val conf = new SparkConf().
> >> >setAppName("CEP_assembly").
> >> >setMaster("local[2]").
> >> >set("spark.driver.allowMultipleContexts", "true").
> >> >set("spark.hadoop.validateOutputSpecs", "false")
> >> >   val sc = new SparkContext(conf)
> >> >   // Create sqlContext based on HiveContext
> >> >   val sqlContext = new HiveContext(sc)
> >> >   import sqlContext.implicits._
> >> >   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> >> >   println ("\nStarted at"); sqlContext.sql("SELECT
> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> >> > ").collect.foreach(println)
> >> > val ssc = new StreamingContext(conf, Seconds(1))
> >> > ssc.checkpoint("checkpoint")
> >> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
> >> > "rhes564:9092",
> >> > "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect"
> ->
> >> > "rhes564:2181", "group.id" -> "StreamTest" )
> >> > val topic = Set("newtopic")
> >> > //val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> >> > StringDecoder](ssc, kafkaParams, topic)
> >> > val dstream = KafkaUtils.createDirectStream[String, String,
> >> > StringDecoder,
> >> > StringDecoder](ssc, kafkaParams, topic)
> >> > dstream.cache()
> >> > //val windowed_dstream = dstream.window(new
> >> > Duration(sliding_window_length),
> >> > new Duration(sliding_window_interval))
> >> > dstream.print(1000)
> >> > val lines = dstream.map(_._2)
> >> > // Check for message
> >> > val showResults = lines.filter(_.contains("Sending
> >> > dstream")).flatMap(line
> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> >> > _).print(1000)
> >> > // Check for statement cache
> >> > val showResults2 = lines.filter(_.contains("statement
> >> > cache")).flatMap(line
> >> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> >> > _).print(1000)
> >> > ssc.start()
> >> > ssc.awaitTermination()
> >> > //ssc.stop()
> >> >   println ("\nFinished at"); sqlContext.sql("SELECT
> >> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> >> > ").collect.foreach(println)
> >> >   }
> >> > }
> >> >
> >> > Thanks
> >> >
> >> >
> >> > Dr Mich Talebzadeh
> >> >
> >> >
> >> >
> >> > LinkedIn
> >> >
> >> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> >
> >> >
> >> >
> >> > http://talebzadehmich.wordpress.com
> >> >
> >> >
> >
> >
>


Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-22 Thread Cody Koeninger
You can still do sliding windows with createDirectStream, just do your
map / extraction of fields before the window.

On Fri, Apr 22, 2016 at 3:53 PM, Mich Talebzadeh
 wrote:
> Hi Cody,
>
> I want to use sliding windows for Complex Event Processing micro-batching
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
>
> On 22 April 2016 at 21:51, Cody Koeninger  wrote:
>>
>> Why are you wanting to convert?
>>
>> As far as doing the conversion, createStream doesn't take the same
>> arguments, look at the docs.
>>
>> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
>>  wrote:
>> > Hi,
>> >
>> > What is the best way of converting this program of that uses
>> > KafkaUtils.createDirectStream to Sliding window using
>> >
>> > val dstream = KafkaUtils.createDirectStream[String, String,
>> > StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> >
>> > to
>> >
>> > val dstream = KafkaUtils.createStream[String, String, StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> >
>> >
>> > The program below works
>> >
>> >
>> > import org.apache.spark.SparkContext
>> > import org.apache.spark.SparkConf
>> > import org.apache.spark.sql.Row
>> > import org.apache.spark.sql.hive.HiveContext
>> > import org.apache.spark.sql.types._
>> > import org.apache.spark.sql.SQLContext
>> > import org.apache.spark.sql.functions._
>> > import _root_.kafka.serializer.StringDecoder
>> > import org.apache.spark.streaming._
>> > import org.apache.spark.streaming.kafka.KafkaUtils
>> > //
>> > object CEP_assembly {
>> >   def main(args: Array[String]) {
>> >   val conf = new SparkConf().
>> >setAppName("CEP_assembly").
>> >setMaster("local[2]").
>> >set("spark.driver.allowMultipleContexts", "true").
>> >set("spark.hadoop.validateOutputSpecs", "false")
>> >   val sc = new SparkContext(conf)
>> >   // Create sqlContext based on HiveContext
>> >   val sqlContext = new HiveContext(sc)
>> >   import sqlContext.implicits._
>> >   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>> >   println ("\nStarted at"); sqlContext.sql("SELECT
>> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> > ").collect.foreach(println)
>> > val ssc = new StreamingContext(conf, Seconds(1))
>> > ssc.checkpoint("checkpoint")
>> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> > "rhes564:9092",
>> > "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" ->
>> > "rhes564:2181", "group.id" -> "StreamTest" )
>> > val topic = Set("newtopic")
>> > //val dstream = KafkaUtils.createStream[String, String, StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> > val dstream = KafkaUtils.createDirectStream[String, String,
>> > StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topic)
>> > dstream.cache()
>> > //val windowed_dstream = dstream.window(new
>> > Duration(sliding_window_length),
>> > new Duration(sliding_window_interval))
>> > dstream.print(1000)
>> > val lines = dstream.map(_._2)
>> > // Check for message
>> > val showResults = lines.filter(_.contains("Sending
>> > dstream")).flatMap(line
>> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> > _).print(1000)
>> > // Check for statement cache
>> > val showResults2 = lines.filter(_.contains("statement
>> > cache")).flatMap(line
>> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
>> > _).print(1000)
>> > ssc.start()
>> > ssc.awaitTermination()
>> > //ssc.stop()
>> >   println ("\nFinished at"); sqlContext.sql("SELECT
>> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> > ").collect.foreach(println)
>> >   }
>> > }
>> >
>> > Thanks
>> >
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-22 Thread Mich Talebzadeh
Hi Cody,

I want to use sliding windows for Complex Event Processing micro-batching

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 21:51, Cody Koeninger  wrote:

> Why are you wanting to convert?
>
> As far as doing the conversion, createStream doesn't take the same
> arguments, look at the docs.
>
> On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
>  wrote:
> > Hi,
> >
> > What is the best way of converting this program of that uses
> > KafkaUtils.createDirectStream to Sliding window using
> >
> > val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> >
> > to
> >
> > val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> >
> >
> > The program below works
> >
> >
> > import org.apache.spark.SparkContext
> > import org.apache.spark.SparkConf
> > import org.apache.spark.sql.Row
> > import org.apache.spark.sql.hive.HiveContext
> > import org.apache.spark.sql.types._
> > import org.apache.spark.sql.SQLContext
> > import org.apache.spark.sql.functions._
> > import _root_.kafka.serializer.StringDecoder
> > import org.apache.spark.streaming._
> > import org.apache.spark.streaming.kafka.KafkaUtils
> > //
> > object CEP_assembly {
> >   def main(args: Array[String]) {
> >   val conf = new SparkConf().
> >setAppName("CEP_assembly").
> >setMaster("local[2]").
> >set("spark.driver.allowMultipleContexts", "true").
> >set("spark.hadoop.validateOutputSpecs", "false")
> >   val sc = new SparkContext(conf)
> >   // Create sqlContext based on HiveContext
> >   val sqlContext = new HiveContext(sc)
> >   import sqlContext.implicits._
> >   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> >   println ("\nStarted at"); sqlContext.sql("SELECT
> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> > ").collect.foreach(println)
> > val ssc = new StreamingContext(conf, Seconds(1))
> > ssc.checkpoint("checkpoint")
> > val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092",
> > "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" ->
> > "rhes564:2181", "group.id" -> "StreamTest" )
> > val topic = Set("newtopic")
> > //val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> > val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topic)
> > dstream.cache()
> > //val windowed_dstream = dstream.window(new
> Duration(sliding_window_length),
> > new Duration(sliding_window_interval))
> > dstream.print(1000)
> > val lines = dstream.map(_._2)
> > // Check for message
> > val showResults = lines.filter(_.contains("Sending
> dstream")).flatMap(line
> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).print(1000)
> > // Check for statement cache
> > val showResults2 = lines.filter(_.contains("statement
> cache")).flatMap(line
> > => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ +
> _).print(1000)
> > ssc.start()
> > ssc.awaitTermination()
> > //ssc.stop()
> >   println ("\nFinished at"); sqlContext.sql("SELECT
> > FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> > ").collect.foreach(println)
> >   }
> > }
> >
> > Thanks
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>


Re: Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-22 Thread Cody Koeninger
Why are you wanting to convert?

As far as doing the conversion, createStream doesn't take the same
arguments, look at the docs.

On Fri, Apr 22, 2016 at 3:44 PM, Mich Talebzadeh
 wrote:
> Hi,
>
> What is the best way of converting this program of that uses
> KafkaUtils.createDirectStream to Sliding window using
>
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
>
> to
>
> val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
>
>
> The program below works
>
>
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.functions._
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> //
> object CEP_assembly {
>   def main(args: Array[String]) {
>   val conf = new SparkConf().
>setAppName("CEP_assembly").
>setMaster("local[2]").
>set("spark.driver.allowMultipleContexts", "true").
>set("spark.hadoop.validateOutputSpecs", "false")
>   val sc = new SparkContext(conf)
>   // Create sqlContext based on HiveContext
>   val sqlContext = new HiveContext(sc)
>   import sqlContext.implicits._
>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   println ("\nStarted at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> val ssc = new StreamingContext(conf, Seconds(1))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" -> "rhes564:9092",
> "schema.registry.url" -> "http://rhes564:8081;, "zookeeper.connect" ->
> "rhes564:2181", "group.id" -> "StreamTest" )
> val topic = Set("newtopic")
> //val dstream = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
> dstream.cache()
> //val windowed_dstream = dstream.window(new Duration(sliding_window_length),
> new Duration(sliding_window_interval))
> dstream.print(1000)
> val lines = dstream.map(_._2)
> // Check for message
> val showResults = lines.filter(_.contains("Sending dstream")).flatMap(line
> => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
> // Check for statement cache
> val showResults2 = lines.filter(_.contains("statement cache")).flatMap(line
> => line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
> ssc.start()
> ssc.awaitTermination()
> //ssc.stop()
>   println ("\nFinished at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
>   }
> }
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Converting from KafkaUtils.createDirectStream to KafkaUtils.createStream

2016-04-22 Thread Mich Talebzadeh
Hi,

What is the best way of converting this program of that uses
KafkaUtils.createDirectStream to Sliding window using

val dstream = *KafkaUtils.createDirectStream*[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic)

to

val dstream = *KafkaUtils.createStream*[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)


The program below works


import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
//
object CEP_assembly {
  def main(args: Array[String]) {
  val conf = new SparkConf().
   setAppName("CEP_assembly").
   setMaster("local[2]").
   set("spark.driver.allowMultipleContexts", "true").
   set("spark.hadoop.validateOutputSpecs", "false")
  val sc = new SparkContext(conf)
  // Create sqlContext based on HiveContext
  val sqlContext = new HiveContext(sc)
  import sqlContext.implicits._
  val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  println ("\nStarted at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
val ssc = new StreamingContext(conf, Seconds(1))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topic = Set("newtopic")
//val dstream = KafkaUtils.createStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)
dstream.cache()
//val windowed_dstream = dstream.window(new Duration(sliding_window_length),
new Duration(sliding_window_interval))
dstream.print(1000)
val lines = dstream.map(_._2)
// Check for message
val showResults = lines.filter(_.contains("Sending dstream")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
// Check for statement cache
val showResults2 = lines.filter(_.contains("statement cache")).flatMap(line
=> line.split("\n,")).map(word => (word, 1)).reduceByKey(_ + _).print(1000)
ssc.start()
ssc.awaitTermination()
//ssc.stop()
  println ("\nFinished at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
  }
}

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Executor still on the UI even if the worker is dead

2016-04-22 Thread kundan kumar
Hi Guys,

Anyone faced this issue with spark ?

Why does it happen so in Spark Streaming that the executors are still shown
on the UI even when the worker is killed and not in the cluster.

This severely impacts my running jobs which takes too longer and the stages
failing with the exception

java.io.IOException: Failed to connect to --- (dead worker)

Is this a bug in Spark ??

Version is 1.4.0


Thanks,
Kundan


executor delay in Spark

2016-04-22 Thread Raghava Mutharaju
Mike,

It turns out the executor delay, as you mentioned, is the cause. After we
introduced a dummy stage, partitioning was working fine. Does this delay
happen during later stages as well? We noticed the same behavior
(partitioning happens on spark-shell but not through spark-submit) at a
later stage also.

Apart from introducing a dummy stage or running it from spark-shell, is
there any other option to fix this?

Regards,
Raghava.


On Mon, Apr 18, 2016 at 12:17 AM, Mike Hynes <91m...@gmail.com> wrote:

> When submitting a job with spark-submit, I've observed delays (up to
> 1--2 seconds) for the executors to respond to the driver in order to
> receive tasks in the first stage. The delay does not persist once the
> executors have been synchronized.
>
> When the tasks are very short, as may be your case (relatively small
> data and a simple map task like you have described), the 8 tasks in
> your stage may be allocated to only 1 executor in 2 waves of 4, since
> the second executor won't have responded to the master before the
> first 4 tasks on the first executor have completed.
>
> To see if this is the cause in your particular case, you could try the
> following to confirm:
> 1. Examine the starting times of the tasks alongside their executor
> 2. Make a "dummy" stage execute before your real stages to
> synchronize the executors by creating and materializing any random RDD
> 3. Make the tasks longer, i.e. with some silly computational work.
>
> Mike
>
>
> On 4/17/16, Raghava Mutharaju  wrote:
> > Yes its the same data.
> >
> > 1) The number of partitions are the same (8, which is an argument to the
> > HashPartitioner). In the first case, these partitions are spread across
> > both the worker nodes. In the second case, all the partitions are on the
> > same node.
> > 2) What resources would be of interest here? Scala shell takes the
> default
> > parameters since we use "bin/spark-shell --master " to run
> the
> > scala-shell. For the scala program, we do set some configuration options
> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo
> > serializer.
> >
> > We are running this on Azure D3-v2 machines which have 4 cores and 14GB
> > RAM.1 executor runs on each worker node. Following configuration options
> > are set for the scala program -- perhaps we should move it to the spark
> > config file.
> >
> > Driver memory and executor memory are set to 12GB
> > parallelism is set to 8
> > Kryo serializer is used
> > Number of retainedJobs and retainedStages has been increased to check
> them
> > in the UI.
> >
> > What information regarding Spark Context would be of interest here?
> >
> > Regards,
> > Raghava.
> >
> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar  wrote:
> >
> >> If the data file is same then it should have similar distribution of
> >> keys.
> >> Few queries-
> >>
> >> 1. Did you compare the number of partitions in both the cases?
> >> 2. Did you compare the resource allocation for Spark Shell vs Scala
> >> Program being submitted?
> >>
> >> Also, can you please share the details of Spark Context, Environment and
> >> Executors when you run via Scala program?
> >>
> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju <
> >> m.vijayaragh...@gmail.com> wrote:
> >>
> >>> Hello All,
> >>>
> >>> We are using HashPartitioner in the following way on a 3 node cluster
> (1
> >>> master and 2 worker nodes).
> >>>
> >>> val u =
> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int,
> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => (y.toInt,
> >>> x.toInt) } }).partitionBy(new
> HashPartitioner(8)).setName("u").persist()
> >>>
> >>> u.count()
> >>>
> >>> If we run this from the spark shell, the data (52 MB) is split across
> >>> the
> >>> two worker nodes. But if we put this in a scala program and run it,
> then
> >>> all the data goes to only one node. We have run it multiple times, but
> >>> this
> >>> behavior does not change. This seems strange.
> >>>
> >>> Is there some problem with the way we use HashPartitioner?
> >>>
> >>> Thanks in advance.
> >>>
> >>> Regards,
> >>> Raghava.
> >>>
> >>
> >>
> >
> >
> > --
> > Regards,
> > Raghava
> > http://raghavam.github.io
> >
>
>
> --
> Thanks,
> Mike
>



-- 
Regards,
Raghava
http://raghavam.github.io


Re: Spark SQL insert overwrite table not showing all the partition.

2016-04-22 Thread Bijay Kumar Pathak
Hi Zhan,

I tried with IF NOT EXISTS clause and still I cannot see the first
partition only the partition with last insert overwrite is present in
the table.

Thanks,
Bijay

On Thu, Apr 21, 2016 at 11:18 PM, Zhan Zhang  wrote:

> INSERT OVERWRITE will overwrite any existing data in the table or partition
>
>- unless IF NOT EXISTS is provided for a partition (as of Hive 0.9.0
>).
>
>
>
> Thanks.
>
> Zhan Zhang
>
> On Apr 21, 2016, at 3:20 PM, Bijay Kumar Pathak  wrote:
>
> Hi,
>
> I have a job which writes to the Hive table with dynamic partition. Inside
> the job,  I am writing into the table two-time but I am only seeing the
> partition with last write although I can see in the Spark UI it is
> processing data fro both the partition.
>
> Below is the query I am using to write to the table.
>
> hive_c.sql("""INSERT OVERWRITE TABLE base_table PARTITION (date='{1}', date_2)
>   SELECT * from temp_table
> """.format(date_val)
>  )
>
>
>
> Thanks,
> Bijay
>
>
>


Re: How this unit test passed on master trunk?

2016-04-22 Thread Ted Yu
This was added by Xiao through:

[SPARK-13320][SQL] Support Star in CreateStruct/CreateArray and Error
Handling when DataFrame/DataSet Functions using Star

I tried in spark-shell and got:

scala> val first =
structDf.groupBy($"a").agg(min(struct($"record.*"))).first()
first: org.apache.spark.sql.Row = [1,[1,1]]

BTW
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.7/715/consoleFull
shows this test passing.

On Fri, Apr 22, 2016 at 11:23 AM, Yong Zhang  wrote:

> Hi,
>
> I was trying to find out why this unit test can pass in Spark code.
>
> in
>
> https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
>
> for this unit test:
>
>   test("Star Expansion - CreateStruct and CreateArray") {
> val structDf = testData2.select("a", "b").as("record")
> // CreateStruct and CreateArray in aggregateExpressions
> *assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == 
> Row(3, Row(3, 1)))*
> assert(structDf.groupBy($"a").agg(min(array($"record.*"))).first() == 
> Row(3, Seq(3, 1)))
>
> // CreateStruct and CreateArray in project list (unresolved alias)
> assert(structDf.select(struct($"record.*")).first() == Row(Row(1, 1)))
> assert(structDf.select(array($"record.*")).first().getAs[Seq[Int]](0) === 
> Seq(1, 1))
>
> // CreateStruct and CreateArray in project list (alias)
> assert(structDf.select(struct($"record.*").as("a")).first() == Row(Row(1, 
> 1)))
> 
> assert(structDf.select(array($"record.*").as("a")).first().getAs[Seq[Int]](0) 
> === Seq(1, 1))
>   }
>
> From my understanding, the data return in this case should be Row(1, Row(1, 
> 1]), as that will be min of struct.
>
> In fact, if I run the spark-shell on my laptop, and I got the result I 
> expected:
>
>
> ./bin/spark-shell
> Welcome to
>     __
>  / __/__  ___ _/ /__
> _\ \/ _ \/ _ `/ __/  '_/
>/___/ .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
>   /_/
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
> Type in expressions to have them evaluated.
> Type :help for more information.
>
> scala> case class TestData2(a: Int, b: Int)
> defined class TestData2
>
> scala> val testData2DF = sqlContext.sparkContext.parallelize(TestData2(1,1) 
> :: TestData2(1,2) :: TestData2(2,1) :: TestData2(2,2) :: TestData2(3,1) :: 
> TestData2(3,2) :: Nil, 2).toDF()
>
> scala> val structDF = testData2DF.select("a","b").as("record")
>
> scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).first()
> res0: org.apache.spark.sql.Row = [1,[1,1]]
>
> scala> structDF.show
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  1|
> |  1|  2|
> |  2|  1|
> |  2|  2|
> |  3|  1|
> |  3|  2|
> +---+---+
>
> So from my spark, which I built on the master, I cannot get Row[3,[1,1]] back 
> in this case. Why the unit test asserts that Row[3,[1,1]] should be the 
> first, and it will pass? But I cannot reproduce that in my spark-shell? I am 
> trying to understand how to interpret the meaning of 
> "agg(min(struct($"record.*")))"
>
>
> Thanks
>
> Yong
>
>


MLLib PySpark RandomForest too many features per tree

2016-04-22 Thread flewloon
When I choose my featureSubsetStrategy for the RandomForestModel I set it to
sqrt which looks like it should let each decision tree have the sqrt of the
total number of features be picked as a feature. I have 900 features so I
thought each tree would have ~30 features or less for each tree. When I look
at the 100 trees I built it turns out that many of them turn out to be using
60-90 features. I have tried looking through the MLLib code to see why this
is happening, but haven't come up with any reasoning why this would happen.
Anyone have any idea why this would be?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLLib-PySpark-RandomForest-too-many-features-per-tree-tp26821.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How this unit test passed on master trunk?

2016-04-22 Thread Yong Zhang
Hi,
I was trying to find out why this unit test can pass in Spark code.
inhttps://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
for this unit test:
  test("Star Expansion - CreateStruct and CreateArray") {
val structDf = testData2.select("a", "b").as("record")
// CreateStruct and CreateArray in aggregateExpressions
assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == 
Row(3, Row(3, 1)))
assert(structDf.groupBy($"a").agg(min(array($"record.*"))).first() == 
Row(3, Seq(3, 1)))

// CreateStruct and CreateArray in project list (unresolved alias)
assert(structDf.select(struct($"record.*")).first() == Row(Row(1, 1)))
assert(structDf.select(array($"record.*")).first().getAs[Seq[Int]](0) === 
Seq(1, 1))

// CreateStruct and CreateArray in project list (alias)
assert(structDf.select(struct($"record.*").as("a")).first() == Row(Row(1, 
1)))

assert(structDf.select(array($"record.*").as("a")).first().getAs[Seq[Int]](0) 
=== Seq(1, 1))
  }From my understanding, the data return in this case should be Row(1, Row(1, 
1]), as that will be min of struct.In fact, if I run the spark-shell on my 
laptop, and I got the result I expected:
./bin/spark-shell
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class TestData2(a: Int, b: Int)
defined class TestData2scala> val testData2DF = 
sqlContext.sparkContext.parallelize(TestData2(1,1) :: TestData2(1,2) :: 
TestData2(2,1) :: TestData2(2,2) :: TestData2(3,1) :: TestData2(3,2) :: Nil, 
2).toDF()scala> val structDF = testData2DF.select("a","b").as("record")scala> 
structDF.groupBy($"a").agg(min(struct($"record.*"))).first()
res0: org.apache.spark.sql.Row = [1,[1,1]]

scala> structDF.show
+---+---+
|  a|  b|
+---+---+
|  1|  1|
|  1|  2|
|  2|  1|
|  2|  2|
|  3|  1|
|  3|  2|
+---+---+So from my spark, which I built on the master, I cannot get 
Row[3,[1,1]] back in this case. Why the unit test asserts that Row[3,[1,1]] 
should be the first, and it will pass? But I cannot reproduce that in my 
spark-shell? I am trying to understand how to interpret the meaning of 
"agg(min(struct($"record.*")))"
ThanksYong

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Marcelo Vanzin
On Fri, Apr 22, 2016 at 10:38 AM, Mich Talebzadeh
 wrote:
> I am trying to test Spark with CEP and I have been shown a sample here
> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

I'm not familiar with CEP, but that's a Spark unit test, so if you're
trying to run it outside of the context of Spark unit tests (as it
seems you're trying to do), you're going to run into a world of
trouble. I'd suggest a different approach where whatever you're trying
to do is done through the Spark build, not outside of it.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
Thanks Ted for the info

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 18:38, Ted Yu  wrote:

> Marcelo:
> From yesterday's thread, Mich revealed that he was looking at:
>
>
> https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
>
> which references SparkFunSuite.
>
> In an earlier thread, Mich was asking about CEP.
>
> Just to give you some background.
>
> On Fri, Apr 22, 2016 at 10:31 AM, Marcelo Vanzin 
> wrote:
>
>> Sorry, I've been looking at this thread and the related ones and one
>> thing I still don't understand is: why are you trying to use internal
>> Spark classes like Logging and SparkFunSuite in your code?
>>
>> Unless you're writing code that lives inside Spark, you really
>> shouldn't be trying to reference them. First reason being that they're
>> "private[spark]" and even if they're available, the compiler won't let
>> you.
>>
>> On Fri, Apr 22, 2016 at 12:21 AM, Mich Talebzadeh
>>  wrote:
>> >
>> > Hi,
>> >
>> > Anyone know which jar file has  import
>> org.apache.spark.internal.Logging?
>> >
>> > I tried spark-core_2.10-1.5.1.jar
>> >
>> > but does not seem to work
>> >
>> > scala> import org.apache.spark.internal.Logging
>> >
>> > :57: error: object internal is not a member of package
>> > org.apache.spark
>> >  import org.apache.spark.internal.Logging
>> >
>> > Thanks
>> >
>> > Dr Mich Talebzadeh
>> >
>> >
>> >
>> > LinkedIn
>> >
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >
>> >
>> >
>> > http://talebzadehmich.wordpress.com
>> >
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
Hi Marcelo

Thanks for your input.

I am trying to test Spark with CEP and I have been shown a sample here


https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala#L532

It is a long code

package org.apache.spark.streaming.kafka

import java.io.File
import java.util.Arrays
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.language.postfixOps

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.Utils

class DirectKafkaStreamSuite
  extends SparkFunSuite
  with BeforeAndAfter
  with BeforeAndAfterAll
  with Eventually
  with Logging {
  val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)

  private var sc: SparkContext = _
  private var ssc: StreamingContext = _
  private var testDir: File = _

  private var kafkaTestUtils: KafkaTestUtils = _

  override def beforeAll {
kafkaTestUtils = new KafkaTestUtils
kafkaTestUtils.setup()
  }

  override def afterAll {
if (kafkaTestUtils != null) {
  kafkaTestUtils.teardown()
  kafkaTestUtils = null
}
  }

  after {
if (ssc != null) {
  ssc.stop()
  sc = null
}
if (sc != null) {
  sc.stop()
}
if (testDir != null) {
  Utils.deleteRecursively(testDir)
}
  }


  test("basic stream receiving with multiple topics and smallest
starting offset") {
val topics = Set("basic1", "basic2", "basic3")
val data = Map("a" -> 7, "b" -> 9)
topics.foreach { t =>
  kafkaTestUtils.createTopic(t)
  kafkaTestUtils.sendMessages(t, data)
}
val totalSent = data.values.sum * topics.size
val kafkaParams = Map(
  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
  "auto.offset.reset" -> "smallest"
)

ssc = new StreamingContext(sparkConf, Milliseconds(200))
val stream = withClue("Error creating direct stream") {
  KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
ssc, kafkaParams, topics)
}

val allReceived = new ConcurrentLinkedQueue[(String, String)]()

// hold a reference to the current offset ranges, so it can be
used downstream
var offsetRanges = Array[OffsetRange]()

stream.transform { rdd =>
  // Get the offset ranges in the RDD
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
logInfo(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
  val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
  // For each partition, get size of the range in the partition,
  // and the number of items in the partition
val off = offsetRanges(i)
val all = iter.toSeq
val partSize = all.size
val rangeSize = off.untilOffset - off.fromOffset
Iterator((partSize, rangeSize))
  }.collect

  // Verify whether number of elements in each partition
  // matches with the corresponding offset range
  collected.foreach { case (partSize, rangeSize) =>
assert(partSize === rangeSize, "offset ranges are wrong")
  }
}
stream.foreachRDD { rdd =>
allReceived.addAll(Arrays.asList(rdd.collect(): _*)) }
ssc.start()
eventually(timeout(2.milliseconds), interval(200.milliseconds)) {
  assert(allReceived.size === totalSent,
"didn't get expected number of messages, messages:\n" +
  allReceived.asScala.mkString("\n"))
}
ssc.stop()
  }

  test("receiving from largest starting offset") {
val topic = "largest"
val topicPartition = TopicAndPartition(topic, 0)
val data = Map("a" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = Map(
  "metadata.broker.list" -> kafkaTestUtils.brokerAddress,
  "auto.offset.reset" -> "largest"
)
val kc = new KafkaCluster(kafkaParams)
def getLatestOffset(): Long = {
  
kc.getLatestLeaderOffsets(Set(topicPartition)).right.get(topicPartition).offset
}

// Send some initial messages before starting context
kafkaTestUtils.sendMessages(topic, data)
eventually(timeout(10 seconds), interval(20 milliseconds)) {
  assert(getLatestOffset() > 3)
}
 

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Ted Yu
Marcelo:
>From yesterday's thread, Mich revealed that he was looking at:

https://github.com/agsachin/spark/blob/CEP/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

which references SparkFunSuite.

In an earlier thread, Mich was asking about CEP.

Just to give you some background.

On Fri, Apr 22, 2016 at 10:31 AM, Marcelo Vanzin 
wrote:

> Sorry, I've been looking at this thread and the related ones and one
> thing I still don't understand is: why are you trying to use internal
> Spark classes like Logging and SparkFunSuite in your code?
>
> Unless you're writing code that lives inside Spark, you really
> shouldn't be trying to reference them. First reason being that they're
> "private[spark]" and even if they're available, the compiler won't let
> you.
>
> On Fri, Apr 22, 2016 at 12:21 AM, Mich Talebzadeh
>  wrote:
> >
> > Hi,
> >
> > Anyone know which jar file has  import org.apache.spark.internal.Logging?
> >
> > I tried spark-core_2.10-1.5.1.jar
> >
> > but does not seem to work
> >
> > scala> import org.apache.spark.internal.Logging
> >
> > :57: error: object internal is not a member of package
> > org.apache.spark
> >  import org.apache.spark.internal.Logging
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn
> >
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Marcelo Vanzin
Sorry, I've been looking at this thread and the related ones and one
thing I still don't understand is: why are you trying to use internal
Spark classes like Logging and SparkFunSuite in your code?

Unless you're writing code that lives inside Spark, you really
shouldn't be trying to reference them. First reason being that they're
"private[spark]" and even if they're available, the compiler won't let
you.

On Fri, Apr 22, 2016 at 12:21 AM, Mich Talebzadeh
 wrote:
>
> Hi,
>
> Anyone know which jar file has  import org.apache.spark.internal.Logging?
>
> I tried spark-core_2.10-1.5.1.jar
>
> but does not seem to work
>
> scala> import org.apache.spark.internal.Logging
>
> :57: error: object internal is not a member of package
> org.apache.spark
>  import org.apache.spark.internal.Logging
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
java.lang.IllegalArgumentException: Cannot add dependency
'org.apache.spark#spark-core_2.10;1.5.1' to configuration 'tests' of module
cep_assembly#cep_assembly_2.10;1.0 because this configuration doesn't exist!

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 22 April 2016 at 16:37, Ted Yu  wrote:

> For SparkFunSuite , add the following:
>
> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" %
> "tests"
>
> On Fri, Apr 22, 2016 at 7:20 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Trying to build with sbt with the following dependencies
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
>> "provided"
>> libraryDependencies += "junit" % "junit" % "4.12"
>> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1"
>> % "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.6.1"
>> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
>> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
>> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
>> libraryDependencies += "org.apache.spark" %
>> "spark-streaming-kafka-assembly_2.10" % "1.6.1"
>>
>>
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:35:
>> object SparkFunSuite is not a member of package org.apache.spark
>> [error] import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
>> [error]^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:47:
>> not found: type SparkFunSuite
>> [error]   extends SparkFunSuite
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:88:
>> package test is not a value
>> [error]   test("basic stream receiving with multiple topics and smallest
>> starting offset") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:146:
>> package test is not a value
>> [error]   test("receiving from largest starting offset") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:191:
>> package test is not a value
>> [error]   test("creating stream by offset") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:237:
>> package test is not a value
>> [error]   test("offset recovery") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:319:
>> package test is not a value
>> [error]   test("Direct Kafka stream report input information") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:358:
>> package test is not a value
>> [error]   test("maxMessagesPerPartition with backpressure disabled") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:367:
>> package test is not a value
>> [error]   test("maxMessagesPerPartition with no lag") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:376:
>> package test is not a value
>> [error]   test("maxMessagesPerPartition respects max rate") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:386:
>> package test is not a value
>> [error]   test("using rate controller") {
>> [error]   ^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:531:
>> object WindowState is not a member of package
>> org.apache.spark.streaming.dstream
>> [error] import org.apache.spark.streaming.dstream.WindowState
>> [error]^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:578:
>> not found: type WindowState
>> [error] def rise(in: Tick, ew: WindowState): Boolean = {
>> [error]^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:582:
>> not found: type WindowState
>> [error] def drop(in: Tick, ew: WindowState): Boolean = {
>> [error]^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:586:
>> not found: type WindowState
>> [error] def deep(in: Tick, ew: WindowState): Boolean = {
>> [error]^
>> [error]
>> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:592:

Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Ted Yu
For SparkFunSuite , add the following:

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1" %
"tests"

On Fri, Apr 22, 2016 at 7:20 AM, Mich Talebzadeh 
wrote:

> Trying to build with sbt with the following dependencies
>
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
> "provided"
> libraryDependencies += "junit" % "junit" % "4.12"
> libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1"
> libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
> libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
> libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
> libraryDependencies += "org.apache.spark" %
> "spark-streaming-kafka-assembly_2.10" % "1.6.1"
>
>
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:35:
> object SparkFunSuite is not a member of package org.apache.spark
> [error] import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
> [error]^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:47:
> not found: type SparkFunSuite
> [error]   extends SparkFunSuite
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:88:
> package test is not a value
> [error]   test("basic stream receiving with multiple topics and smallest
> starting offset") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:146:
> package test is not a value
> [error]   test("receiving from largest starting offset") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:191:
> package test is not a value
> [error]   test("creating stream by offset") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:237:
> package test is not a value
> [error]   test("offset recovery") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:319:
> package test is not a value
> [error]   test("Direct Kafka stream report input information") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:358:
> package test is not a value
> [error]   test("maxMessagesPerPartition with backpressure disabled") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:367:
> package test is not a value
> [error]   test("maxMessagesPerPartition with no lag") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:376:
> package test is not a value
> [error]   test("maxMessagesPerPartition respects max rate") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:386:
> package test is not a value
> [error]   test("using rate controller") {
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:531:
> object WindowState is not a member of package
> org.apache.spark.streaming.dstream
> [error] import org.apache.spark.streaming.dstream.WindowState
> [error]^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:578:
> not found: type WindowState
> [error] def rise(in: Tick, ew: WindowState): Boolean = {
> [error]^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:582:
> not found: type WindowState
> [error] def drop(in: Tick, ew: WindowState): Boolean = {
> [error]^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:586:
> not found: type WindowState
> [error] def deep(in: Tick, ew: WindowState): Boolean = {
> [error]^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:592:
> not found: type WindowState
> [error] val predicateMapping: Map[String, (Tick, WindowState) =>
> Boolean] =
> [error]  ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:595:
> value patternMatchByKeyAndWindow is not a member of
> org.apache.spark.streaming.dstream.DStream[(String, Tick)]
> [error] val matches = kvTicks.patternMatchByKeyAndWindow("rise drop
> [rise ]+ deep".r,
> [error]   ^
> [error]
> /data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:646:
> not found: type 

modifying a DataFrame column that is a MapType

2016-04-22 Thread williamd1618
Hi, 

I have a use case where I've loaded data into DataFrame and would like to
split the DataFrame into two on some predicate, modify one split DataFrame
using withColumn so as to prevent the need to reprocess the data, union the
data, and write out to the filesystem.  An example of the schema (purely for
testing's sake) is as follows:

val schema = StructType(
  Seq(
StructField("a",
  MapType(StringType,
StructType(
  Seq(
StructField("b", StringType, true),
StructField("c", MapType(StringType, StringType), true)
  )
)
  )
),
StructField("other", StringType, true)
  )
)

In this I will split the DataFrame on the value of "other", update one where
the value of the "a" MapType key is some value and set its "c" to null.  I
can see how this can be done using DataFrame.withColumn using structs and
lits, but I've not been able to find a way to manipulate it if the "a"'s
dataType is not another StructType.  

Example code:

val schema = StructType(
  Seq(
StructField("a",
  MapType(StringType,
StructType(
  Seq(
StructField("b", StringType, true),
StructField("c", MapType(StringType, StringType), true)
  )
)
  )
),
StructField("other", StringType, true)
  )
)


val rdd = sc.parallelize(Seq(
  Row(Map("foo" ->("bar", Map("foo" -> "bar"))), "foo"),
  Row(Map("foo" ->("cat", Map("bar" -> "foo"))), "bar")
))

val df = sqlCtx.createDataFrame(rdd, schema)

// I wish there were a functions.map
// here I'd like to create a map with expressions for what goes into the
map
// using functions.col("a").getItem(only key I want).getField(field I
want)
val col = functions.struct(
  functions.col("a").getItem("foo").getField("b"),
  functions.lit(null).cast(MapType(StringType, StringType)).alias("c")
)

val rename = df.withColumn("a", col)

val mutated = sqlContext.createDataFrame(rename.rdd, df.schema)

mutated.unionAll(df).show()


Any help would be greatly appreciated!

Thanks.

dan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/modifying-a-DataFrame-column-that-is-a-MapType-tp26820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.6.1. How to prevent serialization of KafkaProducer

2016-04-22 Thread Alexander Gallego
Thanks brian.

This is basically what I have as well, i just posted the same gist pretty
much on the first email:

  .foreachRDD(rdd => {
  rdd.foreachPartition(part => {
val producer: Producer[String, String] = KafkaWriter.createProducer(
brokers)
part.foreach(item => producer.send(item))
producer.close()
  })


I had a bug w/ implicits from spark. Not really sure why, but I had built a
spark context on a different configuration file and I'm not sure what was
passing the wrong spark context.

Effectively, everything worked when I tested merging my funcs into one file.

Thanks again.

On Thu, Apr 21, 2016 at 2:58 PM, Bryan Jeffrey 
wrote:

> Here is what we're doing:
>
>
> import java.util.Properties
>
> import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
> import net.liftweb.json.Extraction._
> import net.liftweb.json._
> import org.apache.spark.streaming.dstream.DStream
>
> class KafkaWriter(brokers: Array[String], topic: String, numPartitions:
> Int) {
>   def write[T](data: DStream[T]): Unit = {
> KafkaWriter.write(data, topic, brokers, numPartitions)
>   }
> }
>
> object KafkaWriter {
>   def write[T](data: DStream[T], topic: String, brokers: Array[String],
> numPartitions: Int): Unit = {
> val dataToWrite =
>   if (numPartitions > 0) {
> data.repartition(numPartitions)
>   } else {
> data
>   }
>
> dataToWrite
>   .map(x => new KeyedMessage[String, String](topic,
> KafkaWriter.toJson(x)))
>   .foreachRDD(rdd => {
>   rdd.foreachPartition(part => {
> val producer: Producer[String, String] =
> KafkaWriter.createProducer(brokers)
> part.foreach(item => producer.send(item))
> producer.close()
>   })
> })
>   }
>
>   def apply(brokers: Option[Array[String]], topic: String, numPartitions:
> Int): KafkaWriter = {
> val brokersToUse =
>   brokers match {
> case Some(x) => x
> case None => throw new IllegalArgumentException("Must specify
> brokers!")
>   }
>
> new KafkaWriter(brokersToUse, topic, numPartitions)
>   }
>
>   def toJson[T](data: T): String = {
> implicit val formats = DefaultFormats ++
> net.liftweb.json.ext.JodaTimeSerializers.all
> compactRender(decompose(data))
>   }
>
>   def createProducer(brokers: Array[String]): Producer[String, String] = {
> val properties = new Properties()
> properties.put("metadata.broker.list", brokers.mkString(","))
> properties.put("serializer.class", "kafka.serializer.StringEncoder")
>
> val kafkaConfig = new ProducerConfig(properties)
> new Producer[String, String](kafkaConfig)
>   }
> }
>
>
> Then just call:
>
> val kafkaWriter: KafkaWriter =
> KafkaWriter(KafkaStreamFactory.getBrokersFromConfig(config),
> config.getString(Parameters.topicName), numPartitions =
> kafkaWritePartitions)
> detectionWriter.write(dataToWriteToKafka)
>
>
> Hope that helps!
>
> Bryan Jeffrey
>
> On Thu, Apr 21, 2016 at 2:08 PM, Alexander Gallego 
> wrote:
>
>> Thanks Ted.
>>
>>  KafkaWordCount (producer) does not operate on a DStream[T]
>>
>> ```scala
>>
>>
>> object KafkaWordCountProducer {
>>
>>   def main(args: Array[String]) {
>> if (args.length < 4) {
>>   System.err.println("Usage: KafkaWordCountProducer
>>   " +
>> " ")
>>   System.exit(1)
>> }
>>
>> val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
>>
>> // Zookeeper connection properties
>> val props = new HashMap[String, Object]()
>> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
>> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>   "org.apache.kafka.common.serialization.StringSerializer")
>>
>> val producer = new KafkaProducer[String, String](props)
>>
>> // Send some messages
>> while(true) {
>>   (1 to messagesPerSec.toInt).foreach { messageNum =>
>> val str = (1 to wordsPerMessage.toInt).map(x =>
>> scala.util.Random.nextInt(10).toString)
>>   .mkString(" ")
>>
>> val message = new ProducerRecord[String, String](topic, null, str)
>> producer.send(message)
>>   }
>>
>>   Thread.sleep(1000)
>> }
>>   }
>>
>> }
>>
>> ```
>>
>>
>> Also, doing:
>>
>>
>> ```
>> object KafkaSink {
>>  def send(brokers: String, sc: SparkContext, topic: String, key:
>> String, value: String) =
>> getInstance(brokers, sc).value.send(new ProducerRecord(topic,
>> key, value))
>> }
>>
>> KafkaSink.send(brokers, sparkContext)(outputTopic, record._1, record._2)
>>
>> ```
>>
>>
>> Doesn't work either, the result is:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable
>>
>>
>> Thanks!
>>
>>
>>
>>
>> On Thu, Apr 21, 2016 at 1:08 PM, Ted Yu  wrote:
>> >
>> > In 

Re: Custom Log4j layout on YARN = ClassNotFoundException

2016-04-22 Thread andrew.rowson
Apologies, outlook for mac is ridiculous. Copy and paste the original below:

-

I’m running into a strange issue with trying to use a custom Log4j layout for 
Spark (1.6.1) on YARN (CDH). The layout is: 
https://github.com/michaeltandy/log4j-json

If I use a log4j.properties file (supplied with --files) with:

log4j.appender.consolejson=org.apache.log4j.ConsoleAppender
log4j.appender.consolejson.target=System.err
log4j.appender.consolejson.layout=uk.me.mjt.log4jjson.SimpleJsonLayout


And supply the log4j-json.1.0.jar with ‘--jars’ to spark-submit, the driver and 
executors throw an exception right at the start of the log file:

log4j:ERROR Could not instantiate class [uk.me.mjt.log4jjson.SimpleJsonLayout].
java.lang.ClassNotFoundException: uk.me.mjt.log4jjson.SimpleJsonLayout

However, a simple spark job that does something like:

sc.parallelize(List(1,2,3)).foreach(i => 
{Class.forName("uk.me.mjt.log4jjson.SimpleJsonLayout")})

Doesn’t throw an error. So the class is being loaded, but just not in time for 
Log4j to use it.

I've tried a few different options trying to get it to work (including it in 
the YARN application classpath, spark executor classpaths etc) and they all 
produce the same results. The only thing that seems to work is building a 
custom spark-assembly with the maven dependency included in core/pom.xml. This 
way, the layout is included in the spark assembly jar, and I get the JSON log 
output desired.

Is there a classloading issue on Log4j when using --jars? I can't imagine why 
it works with bundling in spark-assembly, but doesn't work with --jars.


From:  Ted Yu 
Date:  Friday, 22 April 2016 at 14:55
To:  Andrew Rowson 
Cc:  "user@spark.apache.org" 
Subject:  Re: Custom Log4j layout on YARN = ClassNotFoundException

There is not much in the body of email. 

Can you elaborate what issue you encountered ?

Thanks

On Fri, Apr 22, 2016 at 2:27 AM, Rowson, Andrew G. (TR Technology & Ops) 
 wrote:



This e-mail is for the sole use of the intended recipient and contains 
information that may be privileged and/or confidential. If you are not an 
intended recipient, please notify the sender by return e-mail and delete this 
e-mail and any attachments. Certain required legal entity disclosures can be 
accessed on our website.


-- Forwarded message --
From: "Rowson, Andrew G. (TR Technology & Ops)" 

To: "user@spark.apache.org" 
Cc: 
Date: Fri, 22 Apr 2016 10:27:53 +0100
Subject: Custom Log4j layout on YARN = ClassNotFoundException


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




smime.p7s
Description: S/MIME cryptographic signature


Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
Trying to build with sbt with the following dependencies

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"  %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "1.5.1" %
"provided"
libraryDependencies += "junit" % "junit" % "4.12"
libraryDependencies += "org.scala-sbt" % "test-interface" % "1.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.1" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.6.1"
libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.5.1"
libraryDependencies += "org.apache.spark" %
"spark-streaming-kafka-assembly_2.10" % "1.6.1"


[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:35:
object SparkFunSuite is not a member of package org.apache.spark
[error] import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
[error]^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:47:
not found: type SparkFunSuite
[error]   extends SparkFunSuite
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:88:
package test is not a value
[error]   test("basic stream receiving with multiple topics and smallest
starting offset") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:146:
package test is not a value
[error]   test("receiving from largest starting offset") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:191:
package test is not a value
[error]   test("creating stream by offset") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:237:
package test is not a value
[error]   test("offset recovery") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:319:
package test is not a value
[error]   test("Direct Kafka stream report input information") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:358:
package test is not a value
[error]   test("maxMessagesPerPartition with backpressure disabled") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:367:
package test is not a value
[error]   test("maxMessagesPerPartition with no lag") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:376:
package test is not a value
[error]   test("maxMessagesPerPartition respects max rate") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:386:
package test is not a value
[error]   test("using rate controller") {
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:531:
object WindowState is not a member of package
org.apache.spark.streaming.dstream
[error] import org.apache.spark.streaming.dstream.WindowState
[error]^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:578:
not found: type WindowState
[error] def rise(in: Tick, ew: WindowState): Boolean = {
[error]^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:582:
not found: type WindowState
[error] def drop(in: Tick, ew: WindowState): Boolean = {
[error]^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:586:
not found: type WindowState
[error] def deep(in: Tick, ew: WindowState): Boolean = {
[error]^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:592:
not found: type WindowState
[error] val predicateMapping: Map[String, (Tick, WindowState) =>
Boolean] =
[error]  ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:595:
value patternMatchByKeyAndWindow is not a member of
org.apache.spark.streaming.dstream.DStream[(String, Tick)]
[error] val matches = kvTicks.patternMatchByKeyAndWindow("rise drop
[rise ]+ deep".r,
[error]   ^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:646:
not found: type WindowState
[error] def rise(in: Tick, ew: WindowState): Boolean = {
[error]^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:650:
not found: type WindowState
[error] def drop(in: Tick, ew: WindowState): Boolean = {
[error]^
[error]
/data6/hduser/scala/CEP_assembly/src/main/scala/myPackage/CEP_assemly.scala:654:
not found: type WindowState

Re: Custom Log4j layout on YARN = ClassNotFoundException

2016-04-22 Thread Ted Yu
There is not much in the body of email.

Can you elaborate what issue you encountered ?

Thanks

On Fri, Apr 22, 2016 at 2:27 AM, Rowson, Andrew G. (TR Technology & Ops) <
andrew.row...@thomsonreuters.com> wrote:

>
> 
>
> This e-mail is for the sole use of the intended recipient and contains
> information that may be privileged and/or confidential. If you are not an
> intended recipient, please notify the sender by return e-mail and delete
> this e-mail and any attachments. Certain required legal entity disclosures
> can be accessed on our website.<
> http://site.thomsonreuters.com/site/disclosures/>
>
>
> -- Forwarded message --
> From: "Rowson, Andrew G. (TR Technology & Ops)" <
> andrew.row...@thomsonreuters.com>
> To: "user@spark.apache.org" 
> Cc:
> Date: Fri, 22 Apr 2016 10:27:53 +0100
> Subject: Custom Log4j layout on YARN = ClassNotFoundException
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Best practices repartition key

2016-04-22 Thread nihed mbarek
Hi,
I'm looking for documentation or best practices about choosing a key or
keys for repartition of dataframe or rdd

Thank you
MBAREK nihed

-- 

M'BAREK Med Nihed,
Fedora Ambassador, TUNISIA, Northern Africa
http://www.nihed.com




Re: Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Ted Yu
Normally Logging would be included in spark-shell session since spark-core
jar is imported by default:

scala> import org.apache.spark.internal.Logging
import org.apache.spark.internal.Logging

See this JIRA:

[SPARK-13928] Move org.apache.spark.Logging into
org.apache.spark.internal.Logging

In 1.6.x release, Logging was at org.apache.spark.Logging

FYI

On Fri, Apr 22, 2016 at 12:21 AM, Mich Talebzadeh  wrote:

>
> Hi,
>
> Anyone know which jar file has  import org.apache.spark.internal.Logging?
>
> I tried *spark-core_2.10-1.5.1.jar *
>
> but does not seem to work
>
> scala> import org.apache.spark.internal.Logging
>
> :57: error: object internal is not a member of package
> org.apache.spark
>  import org.apache.spark.internal.Logging
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Run Apache Spark on EMR

2016-04-22 Thread Jinan Alhajjaj






Hi AllI would like to ask for two thing and I really appreciate the answer 
ASAP1. How do I implement the parallelism in Apache Spark java application?2. 
How to run the Spark application in Amazon EMR? 

  

Re: question about Reynold's talk: " The Future of Real Time"

2016-04-22 Thread Petr Novak
Hi,
I understand it just as that they will provide some lower latency interface
and probably using jdbc so that 3rd party BI tools can integrate and query
streams like they would be static datasets. If BI will repeat the query it
will be updated. I don't know if BI tools are already heading towards some
push based interface like Observables so that Spark could push updates
instead of BI pulling updates (long polling).

Regards,
Petr

On Fri, Apr 22, 2016 at 11:30 AM, charles li 
wrote:

> hi, there, the talk  *The Future of Real Time in Spark* here
> https://www.youtube.com/watch?v=oXkxXDG0gNk  tells that there will be "BI
> app integration" on 24:28 of the video.
>
> what does he mean the *BI app integration* in that talk? does that mean
> that they will develop a BI tool like zeppelin, hue or yhat that integrates
> in Spark 2.1?
>
> thanks,
>
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


question about Reynold's talk: " The Future of Real Time"

2016-04-22 Thread charles li
hi, there, the talk  *The Future of Real Time in Spark* here
https://www.youtube.com/watch?v=oXkxXDG0gNk  tells that there will be "BI
app integration" on 24:28 of the video.

what does he mean the *BI app integration* in that talk? does that mean
that they will develop a BI tool like zeppelin, hue or yhat that integrates
in Spark 2.1?

thanks,


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Custom Log4j layout on YARN = ClassNotFoundException

2016-04-22 Thread Rowson, Andrew G. (TR Technology & Ops)



This e-mail is for the sole use of the intended recipient and contains 
information that may be privileged and/or confidential. If you are not an 
intended recipient, please notify the sender by return e-mail and delete this 
e-mail and any attachments. Certain required legal entity disclosures can be 
accessed on our website.
--- Begin Message ---
<>--- End Message ---

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: pyspark EOFError after calling map

2016-04-22 Thread Pete Werner
Oh great, thank you for clearing that up.

On Fri, Apr 22, 2016 at 5:15 PM, Davies Liu  wrote:

> This exception is already handled well, just noisy, should be muted.
>
> On Wed, Apr 13, 2016 at 4:52 PM, Pete Werner 
> wrote:
>
>> Hi
>>
>> I am new to spark & pyspark.
>>
>> I am reading a small csv file (~40k rows) into a dataframe.
>>
>> from pyspark.sql import functions as F
>> df =
>> sqlContext.read.format('com.databricks.spark.csv').options(header='true',
>> inferschema='true').load('/tmp/sm.csv')
>> df = df.withColumn('verified', F.when(df['verified'] == 'Y',
>> 1).otherwise(0))
>> df2 = df.map(lambda x: Row(label=float(x[0]),
>> features=Vectors.dense(x[1:]))).toDF()
>>
>> I get some weird error that does not occur every single time, but does
>> happen pretty regularly
>>
>> >>> df2.show(1)
>> ++-+
>> |features|label|
>> ++-+
>> |[0.0,0.0,0.0,0.0,...|0.0|
>> ++-+
>> only showing top 1 row
>>
>> >>> df2.count()
>> 41999
>>
>> >>> df2.show(1)
>> ++-+
>> |features|label|
>> ++-+
>> |[0.0,0.0,0.0,0.0,...|0.0|
>> ++-+
>> only showing top 1 row
>>
>> >>> df2.count()
>> 41999
>>
>> >>> df2.show(1)
>> Traceback (most recent call last):
>>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157,
>> in manager
>>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61,
>> in worker
>>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136,
>> in main
>> if read_int(infile) == SpecialLengths.END_OF_STREAM:
>>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line
>> 545, in read_int
>> raise EOFError
>> EOFError
>> ++-+
>> |features|label|
>> ++-+
>> |[0.0,0.0,0.0,0.0,...|4700734.0|
>> ++-+
>> only showing top 1 row
>>
>> Once that EOFError has been raised, I will not see it again until I do
>> something that requires interacting with the spark server
>>
>> When I call df2.count() it shows that [Stage xxx] prompt which is what I
>> mean by it going to the spark server.
>>
>> Anything that triggers that seems to eventually end up giving the
>> EOFError again when I do something with df2.
>>
>> It does not seem to happen with df (vs. df2) so seems like it must be
>> something happening with the df.map() line.
>>
>> --
>>
>> Pete Werner
>> Data Scientist
>> Freelancer.com
>>
>> Level 20
>> 680 George Street
>> Sydney NSW 2000
>>
>> e: pwer...@freelancer.com
>> p:  +61 2 8599 2700
>> w: http://www.freelancer.com
>>
>>
>


-- 

Pete Werner
Data Scientist
Freelancer.com

Level 20
680 George Street
Sydney NSW 2000

e: pwer...@freelancer.com
p:  +61 2 8599 2700
w: http://www.freelancer.com


Which jar file has import org.apache.spark.internal.Logging

2016-04-22 Thread Mich Talebzadeh
Hi,

Anyone know which jar file has  import org.apache.spark.internal.Logging?

I tried *spark-core_2.10-1.5.1.jar *

but does not seem to work

scala> import org.apache.spark.internal.Logging

:57: error: object internal is not a member of package
org.apache.spark
 import org.apache.spark.internal.Logging

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: pyspark EOFError after calling map

2016-04-22 Thread Davies Liu
This exception is already handled well, just noisy, should be muted.

On Wed, Apr 13, 2016 at 4:52 PM, Pete Werner  wrote:

> Hi
>
> I am new to spark & pyspark.
>
> I am reading a small csv file (~40k rows) into a dataframe.
>
> from pyspark.sql import functions as F
> df =
> sqlContext.read.format('com.databricks.spark.csv').options(header='true',
> inferschema='true').load('/tmp/sm.csv')
> df = df.withColumn('verified', F.when(df['verified'] == 'Y',
> 1).otherwise(0))
> df2 = df.map(lambda x: Row(label=float(x[0]),
> features=Vectors.dense(x[1:]))).toDF()
>
> I get some weird error that does not occur every single time, but does
> happen pretty regularly
>
> >>> df2.show(1)
> ++-+
> |features|label|
> ++-+
> |[0.0,0.0,0.0,0.0,...|0.0|
> ++-+
> only showing top 1 row
>
> >>> df2.count()
> 41999
>
> >>> df2.show(1)
> ++-+
> |features|label|
> ++-+
> |[0.0,0.0,0.0,0.0,...|0.0|
> ++-+
> only showing top 1 row
>
> >>> df2.count()
> 41999
>
> >>> df2.show(1)
> Traceback (most recent call last):
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157,
> in manager
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in
> worker
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/worker.py", line 136,
> in main
> if read_int(infile) == SpecialLengths.END_OF_STREAM:
>   File "spark-1.6.1/python/lib/pyspark.zip/pyspark/serializers.py", line
> 545, in read_int
> raise EOFError
> EOFError
> ++-+
> |features|label|
> ++-+
> |[0.0,0.0,0.0,0.0,...|4700734.0|
> ++-+
> only showing top 1 row
>
> Once that EOFError has been raised, I will not see it again until I do
> something that requires interacting with the spark server
>
> When I call df2.count() it shows that [Stage xxx] prompt which is what I
> mean by it going to the spark server.
>
> Anything that triggers that seems to eventually end up giving the EOFError
> again when I do something with df2.
>
> It does not seem to happen with df (vs. df2) so seems like it must be
> something happening with the df.map() line.
>
> --
>
> Pete Werner
> Data Scientist
> Freelancer.com
>
> Level 20
> 680 George Street
> Sydney NSW 2000
>
> e: pwer...@freelancer.com
> p:  +61 2 8599 2700
> w: http://www.freelancer.com
>
>


Re: Save DataFrame to HBase

2016-04-22 Thread Zhan Zhang
You can try this

https://github.com/hortonworks/shc.git

or here
http://spark-packages.org/package/zhzhan/shc

Currently it is in the process of merging into HBase.

Thanks.

Zhan Zhang

On Apr 21, 2016, at 8:44 AM, Benjamin Kim 
> wrote:

Hi Ted,

Can this module be used with an older version of HBase, such as 1.0 or 1.1? 
Where can I get the module from?

Thanks,
Ben

On Apr 21, 2016, at 6:56 AM, Ted Yu 
> wrote:

The hbase-spark module in Apache HBase (coming with hbase 2.0 release) can do 
this.

On Thu, Apr 21, 2016 at 6:52 AM, Benjamin Kim 
> wrote:
Has anyone found an easy way to save a DataFrame into HBase?

Thanks,
Ben


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org






Re: Spark SQL insert overwrite table not showing all the partition.

2016-04-22 Thread Zhan Zhang
INSERT OVERWRITE will overwrite any existing data in the table or partition

  *   unless IF NOT EXISTS is provided for a partition (as of Hive 
0.9.0).


Thanks.

Zhan Zhang

On Apr 21, 2016, at 3:20 PM, Bijay Kumar Pathak 
> wrote:

Hi,

I have a job which writes to the Hive table with dynamic partition. Inside the 
job,  I am writing into the table two-time but I am only seeing the partition 
with last write although I can see in the Spark UI it is processing data fro 
both the partition.

Below is the query I am using to write to the table.

hive_c.sql("""INSERT OVERWRITE TABLE base_table PARTITION (date='{1}', date_2)
  SELECT * from temp_table
""".format(date_val)
 )


Thanks,
Bijay



Re: Spark DataFrame sum of multiple columns

2016-04-22 Thread Divya Gehlot
Easy way of doing it

newdf = df.withColumn('total', sum(df[col] for col in df.columns))


On 22 April 2016 at 11:51, Naveen Kumar Pokala 
wrote:

> Hi,
>
>
>
> Do we have any way to perform Row level operations in spark dataframes.
>
>
>
>
>
> For example,
>
>
>
> I have a dataframe with columns from A,B,C,…Z.. I want to add one more
> column New Column with sum of all column values.
>
>
>
> A
>
> B
>
> C
>
> D
>
> .
>
> .
>
> .
>
> Z
>
> New Column
>
> 1
>
> 2
>
> 4
>
> 3
>
>
>
>
>
>
>
> 26
>
> 351
>
>
>
>
>
> Can somebody help me on this?
>
>
>
>
>
> Thanks,
>
> Naveen
>


Re: Spark DataFrame sum of multiple columns

2016-04-22 Thread Zhan Zhang
You can define your own udf, following is one example

Thanks

Zhan Zhang


val foo = udf((a: Int, b: String) => a.toString + b)

checkAnswer(
  // SELECT *, foo(key, value) FROM testData
  testData.select($"*", foo('key, 'value)).limit(3),



On Apr 21, 2016, at 8:51 PM, Naveen Kumar Pokala 
> wrote:

Hi,

Do we have any way to perform Row level operations in spark dataframes.


For example,

I have a dataframe with columns from A,B,C,…Z.. I want to add one more column 
New Column with sum of all column values.

A

B

C

D

.

.

.

Z

New Column

1

2

4

3







26

351



Can somebody help me on this?


Thanks,
Naveen