Re: [Spark 2.0] Why MutableInt cannot be cast to MutableLong?

2016-07-31 Thread Chanh Le
If I have a column store in a parquet file under INT type and I create a table 
with the same column but change the time from int to bigint.

in Spark 2.0 it shows error:

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 
0 in stage 259.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
259.0 (TID 22958, slave2): java.lang.UnsupportedOperationException: 
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary
at org.apache.parquet.column.Dictionary.decodeToInt(Dictionary.java:48)
at 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getInt(OnHeapColumnVector.java:233)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


So I think this error still happen in Spark 2.0




> On Aug 1, 2016, at 9:21 AM, Chanh Le  wrote:
> 
> Sorry my bad, I ran in Spark 1.6.1 but what about this error?
> Why Int cannot be cast to Long?
> 
> 
> Thanks.
> 
> 
>> On Aug 1, 2016, at 2:44 AM, Michael Armbrust > > wrote:
>> 
>> Are you sure you are running Spark 2.0?
>> 
>> In your stack trace I see SqlNewHadoopRDD, which was removed in #12354 
>> .
>> 
>> On Sun, Jul 31, 2016 at 2:12 AM, Chanh Le > > wrote:
>> Hi everyone,
>> Why MutableInt cannot be cast to MutableLong?
>> It’s really weird and seems Spark 2.0 has a lot of error with parquet about 
>> format.
>> 
>> org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to 
>> org.apache.spark.sql.catalyst.expressions.MutableL ong
>> 
>> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read 
>> value at 0 in block 0 in file 
>> file:/data/etl-report/parquet/AD_COOKIE_REPORT/time=2016-07-
>> 25-16/network_id=31713/part-r-0-9adbef89-f2f4-4836-a50c-a2e7b381d558.snappy.parquet
>> at 
>> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
>> at 
>> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
>> at 
>> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
>> at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>> at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at 
>> 

Re: Writing custom Transformers and Estimators like Tokenizer in spark ML

2016-07-31 Thread janardhan shetty
Developing in scala but any help with difference between UnaryTransformer
(Is this experimental still ?)and Transformer class is appreciated.

Right now encountering  error for the code which extends UnaryTransformer

override protected def outputDataType: DataType = new StringType

Error:(26, 53) constructor StringType in class StringType cannot be
accessed in class Capitalizer
  override protected def outputDataType: DataType = new StringType
^



On Thu, Jul 28, 2016 at 8:20 PM, Phuong LE-HONG  wrote:

> Hi,
>
> I've developed a simple ML estimator (in Java) that implements
> conditional Markov model for sequence labelling in Vitk toolkit. You
> can check it out here:
>
>
> https://github.com/phuonglh/vn.vitk/blob/master/src/main/java/vn/vitk/tag/CMM.java
>
> Phuong Le-Hong
>
> On Fri, Jul 29, 2016 at 9:01 AM, janardhan shetty
>  wrote:
> > Thanks Steve.
> >
> > Any pointers to custom estimators development as well ?
> >
> > On Wed, Jul 27, 2016 at 11:35 AM, Steve Rowe  wrote:
> >>
> >> You can see the source for my transformer configurable bridge to Lucene
> >> analysis components here, in my company Lucidworks’ spark-solr project:
> >> <
> https://github.com/lucidworks/spark-solr/blob/master/src/main/scala/com/lucidworks/spark/ml/feature/LuceneTextAnalyzerTransformer.scala
> >.
> >>
> >> Here’s a blog I wrote about using this transformer, as well as
> >> non-ML-context use in Spark of the underlying analysis component, here:
> >>  >.
> >>
> >> --
> >> Steve
> >> www.lucidworks.com
> >>
> >> > On Jul 27, 2016, at 1:31 PM, janardhan shetty  >
> >> > wrote:
> >> >
> >> > 1.  Any links or blogs to develop custom transformers ? ex: Tokenizer
> >> >
> >> > 2. Any links or blogs to develop custom estimators ? ex: any ml
> >> > algorithm
> >>
> >
>


is Hadoop need to be installed?

2016-07-31 Thread ayan guha
Hi

I am trying to run spark 2.0 prebuilt with hadoop 2.7 on windows. I do not
have hadoop installed as I wanted to test spark alone.

When I run pyspark it does start up, but reading any file using dataframe
APIs fail. I recall it was doable in earlier versions of spark, but is it
something not possible anymore?

[image: Inline image 1]


-- 
Best Regards,
Ayan Guha


Re: [Spark 2.0] Why MutableInt cannot be cast to MutableLong?

2016-07-31 Thread Chanh Le
Sorry my bad, I ran in Spark 1.6.1 but what about this error?
Why Int cannot be cast to Long?


Thanks.


> On Aug 1, 2016, at 2:44 AM, Michael Armbrust  wrote:
> 
> Are you sure you are running Spark 2.0?
> 
> In your stack trace I see SqlNewHadoopRDD, which was removed in #12354 
> .
> 
> On Sun, Jul 31, 2016 at 2:12 AM, Chanh Le  > wrote:
> Hi everyone,
> Why MutableInt cannot be cast to MutableLong?
> It’s really weird and seems Spark 2.0 has a lot of error with parquet about 
> format.
> 
> org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.MutableL ong
> 
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block 0 in file 
> file:/data/etl-report/parquet/AD_COOKIE_REPORT/time=2016-07-
> 25-16/network_id=31713/part-r-0-9adbef89-f2f4-4836-a50c-a2e7b381d558.snappy.parquet
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> at 
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
> at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
> Caused by: java.lang.ClassCastException: 
> org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to 
> org.apache.spark.sql.catalyst.expressions.MutableL
> ong
> at 
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setLong(SpecificMutableRow.scala:295)
> at 
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter$RowUpdater.setLong(CatalystRowConverter.scala:161)
> at 
> org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter.addLong(CatalystRowConverter.scala:85)
> at 
> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:269)
> at 
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:365)
> at 
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
> at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
> ... 20 more
> 



Re: spark java - convert string to date

2016-07-31 Thread Hyukjin Kwon
I haven't used this by myself but I guess those functions should work.

unix_timestamp()
​


See
https://github.com/apache/spark/blob/480c870644595a71102be6597146d80b1c0816e4/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2513-L2530



2016-07-31 22:57 GMT+09:00 Tony Lane :

> Any built in function in java with spark to convert string to date more
> efficiently
> or do we just use the standard java techniques
>
> -Tony
>


Re: Tuning level of Parallelism: Increase or decrease?

2016-07-31 Thread Jestin Ma
It seems that the number of tasks being this large do not matter. Each task
was set default by the HDFS as 128 MB (block size) which I've heard to be
ok. I've tried tuning the block (task) size to be larger and smaller to no
avail.

I tried coalescing to 50 but that introduced large data skew and slowed
down my job a lot.

On Sun, Jul 31, 2016 at 5:27 PM, Andrew Ehrlich  wrote:

> 15000 seems like a lot of tasks for that size. Test it out with a
> .coalesce(50) placed right after loading the data. It will probably either
> run faster or crash with out of memory errors.
>
> On Jul 29, 2016, at 9:02 AM, Jestin Ma  wrote:
>
> I am processing ~2 TB of hdfs data using DataFrames. The size of a task is
> equal to the block size specified by hdfs, which happens to be 128 MB,
> leading to about 15000 tasks.
>
> I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
> I'm performing groupBy, count, and an outer-join with another DataFrame of
> ~200 MB size (~80 MB cached but I don't need to cache it), then saving to
> disk.
>
> Right now it takes about 55 minutes, and I've been trying to tune it.
>
> I read on the Spark Tuning guide that:
> *In general, we recommend 2-3 tasks per CPU core in your cluster.*
>
> This means that I should have about 30-50 tasks instead of 15000, and each
> task would be much bigger in size. Is my understanding correct, and is this
> suggested? I've read from difference sources to decrease or increase
> parallelism, or even keep it default.
>
> Thank you for your help,
> Jestin
>
>
>


Windows - Spark 2 - Standalone - Worker not able to connect to Master

2016-07-31 Thread ayan guha
Hi

I just downloaded Spark 2.0 on my windows 7 to check it out. However, not
able to set up a standalone cluster:


Step 1: master set up (Successful)

bin/spark-class org.apache.spark.deploy.master.Master

It did throw an error about not able to find winutils, but started
successfully.

Step II: Set up Worker (Failed)

bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

This step fails with following error:

16/08/01 11:21:27 INFO Worker: Connecting to master localhost:7077...
16/08/01 11:21:28 WARN Worker: Failed to connect to master localhost:7077
org.apache.spark.SparkException: Exception thrown in awaitResult
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.sca
la:77)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.sca
la:75)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.s
cala:36)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyO
rElse(RpcTimeout.scala:59)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyO
rElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at
org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88)
at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96)
at
org.apache.spark.deploy.worker.Worker$$anonfun$org$apache$spark$deplo
y$worker$Worker$$tryRegisterAllMasters$1$$anon$1.run(Worker.scala:216)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: Failed to connect to localhost/
127.0.0.1:7077
at
org.apache.spark.network.client.TransportClientFactory.createClient(T
ransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createClient(T
ransportClientFactory.java:179)
at
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala
:197)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:191)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
... 4 more
Caused by: java.net.ConnectException: Connection refused: no further
information
: localhost/127.0.0.1:7077
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocke
tChannel.java:224)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConne
ct(AbstractNioChannel.java:289)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.jav
a:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEve
ntLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.ja
va:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThread
EventExecutor.java:111)
... 1 more

Am I doing something wrong?


-- 
Best Regards,
Ayan Guha


Re: Spark recovery takes long

2016-07-31 Thread NB
Well, we haven't really enabled recovery after running into this issue in
Spark 1.2. I do intend to try this again soon with Spark 1.6.1 and see if it
works out this time.

NB




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-recovery-takes-long-tp22876p27441.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Tuning level of Parallelism: Increase or decrease?

2016-07-31 Thread Andrew Ehrlich
15000 seems like a lot of tasks for that size. Test it out with a .coalesce(50) 
placed right after loading the data. It will probably either run faster or 
crash with out of memory errors.

> On Jul 29, 2016, at 9:02 AM, Jestin Ma  wrote:
> 
> I am processing ~2 TB of hdfs data using DataFrames. The size of a task is 
> equal to the block size specified by hdfs, which happens to be 128 MB, 
> leading to about 15000 tasks.
> 
> I'm using 5 worker nodes with 16 cores each and ~25 GB RAM.
> I'm performing groupBy, count, and an outer-join with another DataFrame of 
> ~200 MB size (~80 MB cached but I don't need to cache it), then saving to 
> disk.
> 
> Right now it takes about 55 minutes, and I've been trying to tune it.
> 
> I read on the Spark Tuning guide that:
> In general, we recommend 2-3 tasks per CPU core in your cluster.
> 
> This means that I should have about 30-50 tasks instead of 15000, and each 
> task would be much bigger in size. Is my understanding correct, and is this 
> suggested? I've read from difference sources to decrease or increase 
> parallelism, or even keep it default.
> 
> Thank you for your help,
> Jestin



Re: How to write contents of RDD to HDFS as separate file for each item in RDD (PySpark)

2016-07-31 Thread Andrew Ehrlich
You could write each image to a different directory instead of a different 
file. That can be done by filtering the RDD into one RDD for each image and 
then saving each. That might not be what you’re after though, in terms of space 
and speed efficiency. Another way would be to save them multiple outputs into 
one parquet (or text) file. There might be information on the image you can 
partition on (probably by some timestamp) to make lookups faster.

> On Jul 30, 2016, at 8:01 PM, Bhaarat Sharma  wrote:
> 
> I am just trying to do this as a proof of concept. The actual content of the 
> files will be quite bit. 
> 
> I'm having problem using foreach or something similar on an RDD. 
> sc.binaryFiles("/root/sift_images_test/*.jpg")
> returns
> ("filename1", bytes)
> ("filname2",bytes)
> I'm wondering if there is a do processing one each of these (process in this 
> case is just getting the bytes length but will be something else in real 
> world) and then write the contents to separate HDFS files. 
> If this doesn't make sense, would it make more sense to have all contents in 
> a single HDFS file?
> 
> On Sat, Jul 30, 2016 at 10:19 PM, ayan guha  > wrote:
> This sounds a bad idea, given hdfs does not work well with small files.
> 
> On Sun, Jul 31, 2016 at 8:57 AM, Bhaarat Sharma  > wrote:
> I am reading bunch of files in PySpark using binaryFiles. Then I want to get 
> the number of bytes for each file and write this number to an HDFS file with 
> the corresponding name. 
> 
> Example:
> 
> if directory /myimages has one.jpg, two.jpg, and three.jpg then I want three 
> files one-success.jpg, two-success.jpg, and three-success.jpg in HDFS with a 
> number in each. The number will specify the length of bytes. 
> 
> Here is what I've done thus far:
> 
> from pyspark import SparkContext
> import numpy as np
> 
> sc = SparkContext("local", "test")
> 
> def bytes_length(rawdata):
> length = len(np.asarray(bytearray(rawdata),dtype=np.uint8))
> return length
> 
> images = sc.binaryFiles("/root/sift_images_test/*.jpg")
> images.map(lambda(filename, contents): 
> bytes_length(contents)).saveAsTextFile("hdfs://localhost:9000/tmp/somfile")
> 
> However, doing this creates a single file in HDFS:
> $ hadoop fs -cat /tmp/somfile/part-0
> 113212
> 144926
> 178923
> Instead I want /tmp/somefile in HDFS to have three files:
> one-success.txt with value 113212
> two-success.txt with value 144926
> three-success.txt with value 178923
> 
> Is it possible to achieve what I'm after? I don't want to write files to 
> local file system and them put them in HDFS. Instead, I want to use the 
> saveAsTextFile method on the RDD directly.
> 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha
> 



Re: error while running filter on dataframe

2016-07-31 Thread ayan guha
It would help to share spark version, env details and code snippet. There
are many very knowledgeable guys here who can then be able to help
On 1 Aug 2016 02:15, "Tony Lane"  wrote:

> Can someone help me understand this error which occurs while running a
> filter on a dataframe
>
> 2016-07-31 21:01:57 ERROR CodeGenerator:91 - failed to compile:
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
> 117, Column 58: Expression "mapelements_isNull" is not an rvalue
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */   return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#127L])
> /* 007 */ +- Project
> /* 008 */ +- Filter (is...
> /* 009 */   */
> /* 010 */ final class GeneratedIterator extends
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 011 */   private Object[] references;
> /* 012 */   private boolean agg_initAgg;
> /* 013 */   private boolean agg_bufIsNull;
> /* 014 */   private long agg_bufValue;
> /* 015 */   private scala.collection.Iterator inputadapter_input;
> /* 016 */   private Object[] deserializetoobject_values;
> /* 017 */   private org.apache.spark.sql.types.StructType
> deserializetoobject_schema;
> /* 018 */   private UnsafeRow deserializetoobject_result;
> /* 019 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> deserializetoobject_holder;
> /* 020 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> deserializetoobject_rowWriter;
> /* 021 */   private UnsafeRow mapelements_result;
> /* 022 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> mapelements_holder;
> /* 023 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> mapelements_rowWriter;
> /* 024 */   private Object[] serializefromobject_values;
> /* 025 */   private UnsafeRow serializefromobject_result;
> /* 026 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> serializefromobject_holder;
> /* 027 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter;
> /* 028 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter1;
> /* 029 */   private org.apache.spark.sql.execution.metric.SQLMetric
> filter_numOutputRows;
> /* 030 */   private UnsafeRow filter_result;
> /* 031 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> filter_holder;
> /* 032 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter;
> /* 033 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter1;
> /* 034 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_numOutputRows;
> /* 035 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_aggTime;
> /* 036 */   private UnsafeRow agg_result;
> /* 037 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
> /* 038 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> agg_rowWriter;
> /* 039 */
> /* 040 */   public GeneratedIterator(Object[] references) {
> /* 041 */ this.references = references;
> /* 042 */   }
> /* 043 */
>
>


Re: How to filter based on a constant value

2016-07-31 Thread ayan guha
Hi

Interesting problem :) And this is where my knowledge is limited. But what
I understand is this is a clustering problem of names. You may want to find
a bunch of names belongs to same group by doing say distance between them.
Spark supports few clustering algorithm under mllib.

Love to know other opinions about getting this done.

After all we all asking the same question, don't we :)
On 1 Aug 2016 00:36, "Mich Talebzadeh"  wrote:

> Many thanks
>
> This is a challenge of some sort.
>
> I did this for my own work.
>
> I downloaded my bank account for the past few years as a CSV format and
> loaded into Hive ORC table with databricks stuff.
>
> All tables are transactional and bucketed.
>
> A typical row looks like this (they vary from bank to bank)
>
> hive> desc ll_18740868;
> transactiondate date
> transactiontype string
> sortcodestring
> accountnumber   string
> transactiondescription  string
> debitamount double
> creditamountdouble
> balance double
>
> The columns I am interested are
>
> transactiondate date
> transactiontype string
> transactiondescription  string
> debitamount double
>
> Basically paying by Debit Card means I need to look at transactiontype =
> 'DEB'. Actually I download all their codes (those three letters
> abbreviation like
>
> 1   BGC Bank Giro Credit
> 2   BNS Bonus
> 12  DEB Debit Card
>
> etc
>
> My biggest challenge has been to decode transactiondescription column as
> for each payee it uses different description. For example
>
> SELECT DISTINCT transactiondescription FROM  ll_18740868 ORDER BY
> substring(transactiondescription,1,INSTR(transactiondescription,'CD')-2);
>
> SAINSBURY'S S/MKT CD 5710
> SAINSBURY'S S/MKT CD 4610
> SAINSBURY'S S/MKT CD 5916
> SAINSBURY'S S/MKT CD 4628
> SAINSBURY'S S/MKT CD 4636
> SAINSBURY'S SMKT CD 5710
> SAINSBURY'S SMKT CD 4628
> SAINSBURYS CD 5916
> SAINSBURYS CD 5710
> SAINSBURYS S/MKT CD 5710
> SAINSBURYS S/MKTS CD 5916
> SAINSBURYS S/MKTS CD 4628
> SAINSBURYS S/MKTS CD 4636
> SAINSBURYS SMKT CD 5710
> SALISBURY CATHEDRA CD 5710
>
> If I look at the description I can see that they all belong to a grocer so
> if I sun them up as below I will know how much I spent in total for each
> payee sort of) by taking those substring up to -2 characters before CD
>
> CREATE TEMPORARY TABLE tmp (hashtag String, Spent float);
> INSERT INTO tmp
> SELECT DISTINCT *
> FROM (SELECT
> SUBSTRING(transactiondescription,1,INSTR(transactiondescription,'CD')-2),
> SUM(debitamount) OVER (PARTITION BY
> substring(transactiondescription,1,INSTR(transactiondescription,'CD')-2)) r
> FROM accounts.ll_18740868 WHERE transactiontype = 'DEB'
> ) RS
> ;
> SELECT * FROM tmp order by spent,hashtag;
>
>
> This is crude. What I really want is to dig in into all
> transactiondescriptions which belong to the same payee under full text.
> Like above they are all Sainsbury  whatever they describe :)
>
> It is a type of sentiment analysis for myself so I know where I am wasting
> my money most :)
>
> Cheers
>
>
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 13:37, ayan guha  wrote:
>
>> Hi
>>
>> here is a quick setup (Based on airlines.txt dataset):
>>
>>
>> --
>> *from datetime import datetime, timedelta*
>> *from pyspark.sql.types import **
>> *from pyspark.sql.functions import udf, col,rank,min*
>> *from pyspark import SparkContext, HiveContext*
>> *import sys*
>> *from pyspark.sql import Window*
>>
>>
>> *sc = SparkContext()*
>> *hc = HiveContext(sc)*
>> *customSchema = StructType([ \*
>> *StructField("airport_id", IntegerType(), True) , \*
>> *StructField("name", StringType(), True) , \*
>> *StructField("city", StringType(), True) , \*
>> *StructField("country", StringType(), True) , \*
>> *StructField("iata", StringType(), True) , \*
>> *StructField("icao", StringType(), True) , \*
>> *StructField("latitude", DecimalType(precision=20,scale=10), True) , \*
>> *StructField("longitude",DecimalType(precision=20,scale=10), True) , \*
>> *StructField("altitude", IntegerType(), True) , \*
>> *StructField("timezone", DoubleType(), True) , \*
>> 

Re: Fail a batch in Spark Streaming forcefully based on business rules

2016-07-31 Thread Lars Albertsson
I don't know your context, so I don't have a solution for you. If you
provide more information, the list might be able to suggest a
solution.

IIUYC, however, it sounds like you could benefit from decoupling
operational failure from business level failure. E.g. if there is a
failure according to your business rules, keep the job running, but
emit business level failure records. If records need to be
reprocessed, emit them to another stream topic and reprocess.

It is risky to inject system level failures under normal operations.
An operational failure is normally an anomaly that should be
addressed; if you induce failures, system failures would become part
of normal operations, and real failures risk passing unnoticed.

Regards,



Lars Albertsson
Data engineering consultant
www.mapflat.com
https://twitter.com/lalleal
+46 70 7687109
Calendar: https://goo.gl/6FBtlS


On Thu, Jul 28, 2016 at 12:11 PM, Hemalatha A
 wrote:
>
> Another usecase why I need to do this is, If Exception A is caught I should
> just print it and ignore, but ifException B occurs, I have to end the batch,
> fail it and stop processing the batch.
> Is it possible to achieve this?? Any hints on this please.
>
>
> On Wed, Jul 27, 2016 at 10:42 AM, Hemalatha A
>  wrote:
>>
>> Hello,
>>
>> I have a uescase where in, I have  to fail certain batches in my streaming
>> batches, based on my application specific business rules.
>> Ex: If in a batch of 2 seconds, I don't receive 100 message, I should fail
>> the batch and move on.
>>
>> How to achieve this behavior?
>>
>> --
>>
>>
>> Regards
>> Hemalatha
>
>
>
>
> --
>
>
> Regards
> Hemalatha

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Clean up app folders in worker nodes

2016-07-31 Thread pbirsinger
This works. I needed to restart the master and slaves for the changes to take
effect. Plus 1 million to you sir.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Clean-up-app-folders-in-worker-nodes-tp20889p27440.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: build error - failing test- Error while building spark 2.0 trunk from github

2016-07-31 Thread Jacek Laskowski
Hi,

Can you share what's the command to run the build? What's the OS? Java?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 31, 2016 at 6:54 PM, Rohit Chaddha
 wrote:
> ---
>  T E S T S
> ---
> Running org.apache.spark.api.java.OptionalSuite
> Tests run: 8, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.052 sec -
> in org.apache.spark.api.java.OptionalSuite
> Running org.apache.spark.JavaAPISuite
> Tests run: 90, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 23.537 sec
> <<< FAILURE! - in org.apache.spark.JavaAPISuite
> wholeTextFiles(org.apache.spark.JavaAPISuite)  Time elapsed: 0.331 sec  <<<
> FAILURE!
> java.lang.AssertionError:
> expected:> but was:
> at
> org.apache.spark.JavaAPISuite.wholeTextFiles(JavaAPISuite.java:1087)
>
> Running org.apache.spark.JavaJdbcRDDSuite
> Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.799 sec -
> in org.apache.spark.JavaJdbcRDDSuite
> Running org.apache.spark.launcher.SparkLauncherSuite
> Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.04 sec <<<
> FAILURE! - in org.apache.spark.launcher.SparkLauncherSuite
> testChildProcLauncher(org.apache.spark.launcher.SparkLauncherSuite)  Time
> elapsed: 0.03 sec  <<< FAILURE!
> java.lang.AssertionError: expected:<0> but was:<1>
> at
> org.apache.spark.launcher.SparkLauncherSuite.testChildProcLauncher(SparkLauncherSuite.java:110)
>
> Running org.apache.spark.memory.TaskMemoryManagerSuite
> Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.011 sec -
> in org.apache.spark.memory.TaskMemoryManagerSuite
> Running org.apache.spark.shuffle.sort.PackedRecordPointerSuite
> Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 sec -
> in org.apache.spark.shuffle.sort.PackedRecordPointerSuite
> Running org.apache.spark.shuffle.sort.ShuffleInMemoryRadixSorterSuite
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.103 sec -
> in org.apache.spark.shuffle.sort.ShuffleInMemoryRadixSorterSuite
> Running org.apache.spark.shuffle.sort.ShuffleInMemorySorterSuite
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.199 sec -
> in org.apache.spark.shuffle.sort.ShuffleInMemorySorterSuite
> Running org.apache.spark.shuffle.sort.UnsafeShuffleWriterSuite
> Tests run: 20, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.67 sec -
> in org.apache.spark.shuffle.sort.UnsafeShuffleWriterSuite
> Running org.apache.spark.unsafe.map.BytesToBytesMapOffHeapSuite
> Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.97 sec -
> in org.apache.spark.unsafe.map.BytesToBytesMapOffHeapSuite
> Running org.apache.spark.unsafe.map.BytesToBytesMapOnHeapSuite
> Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.583 sec -
> in org.apache.spark.unsafe.map.BytesToBytesMapOnHeapSuite
> Running
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterRadixSortSuite
> Tests run: 10, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.533 sec -
> in
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterRadixSortSuite
> Running
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite
> Tests run: 10, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.606 sec -
> in org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite
> Running
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterRadixSortSuite
> Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.007 sec -
> in
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterRadixSortSuite
> Running
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterSuite
> Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.001 sec -
> in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterSuite
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m;
> support was removed in 8.0
>
> Results :
>
> Failed tests:
>   JavaAPISuite.wholeTextFiles:1087 expected:> but was:
>   SparkLauncherSuite.testChildProcLauncher:110 expected:<0> but was:<1>
>
> Tests run: 189, Failures: 2, Errors: 0, Skipped: 0

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: error while running filter on dataframe

2016-07-31 Thread Jacek Laskowski
Hi,

My eyes are not trained to read this low-level output...yet...so could
you narrow down to a reproducible code and post it here and/or file an
JIRA issue? Thanks.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 31, 2016 at 6:14 PM, Tony Lane  wrote:
> Can someone help me understand this error which occurs while running a
> filter on a dataframe
>
> 2016-07-31 21:01:57 ERROR CodeGenerator:91 - failed to compile:
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
> 117, Column 58: Expression "mapelements_isNull" is not an rvalue
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */   return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#127L])
> /* 007 */ +- Project
> /* 008 */ +- Filter (is...
> /* 009 */   */
> /* 010 */ final class GeneratedIterator extends
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 011 */   private Object[] references;
> /* 012 */   private boolean agg_initAgg;
> /* 013 */   private boolean agg_bufIsNull;
> /* 014 */   private long agg_bufValue;
> /* 015 */   private scala.collection.Iterator inputadapter_input;
> /* 016 */   private Object[] deserializetoobject_values;
> /* 017 */   private org.apache.spark.sql.types.StructType
> deserializetoobject_schema;
> /* 018 */   private UnsafeRow deserializetoobject_result;
> /* 019 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> deserializetoobject_holder;
> /* 020 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> deserializetoobject_rowWriter;
> /* 021 */   private UnsafeRow mapelements_result;
> /* 022 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> mapelements_holder;
> /* 023 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> mapelements_rowWriter;
> /* 024 */   private Object[] serializefromobject_values;
> /* 025 */   private UnsafeRow serializefromobject_result;
> /* 026 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> serializefromobject_holder;
> /* 027 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter;
> /* 028 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter1;
> /* 029 */   private org.apache.spark.sql.execution.metric.SQLMetric
> filter_numOutputRows;
> /* 030 */   private UnsafeRow filter_result;
> /* 031 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> filter_holder;
> /* 032 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter;
> /* 033 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter1;
> /* 034 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_numOutputRows;
> /* 035 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_aggTime;
> /* 036 */   private UnsafeRow agg_result;
> /* 037 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
> /* 038 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> agg_rowWriter;
> /* 039 */
> /* 040 */   public GeneratedIterator(Object[] references) {
> /* 041 */ this.references = references;
> /* 042 */   }
> /* 043 */
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark 2.0 readStream from a REST API

2016-07-31 Thread Jacek Laskowski
Hi,

See 
https://github.com/jaceklaskowski/spark-workshop/tree/master/solutions/spark-mf-format.
There's a custom format that you can use to get started.

Basically, you need to develop the code behind "mysource" format and
register it using --packages or --jars or similar when you
spark-submit your Spark application.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 31, 2016 at 6:45 PM, Ayoub Benali
 wrote:
> Looks like the way to go in spark 2.0 is to implement StreamSourceProvider
> with DataSourceRegister. But now spark fails at loading the class when
> doing:
>
> spark.readStream.format("mysource").load()
>
> I get :
>
> java.lang.ClassNotFoundException: Failed to find data source: mysource.
> Please find packages at http://spark-packages.org
>
> Is there something I need to do in order to "load" the Stream source
> provider ?
>
> Thanks,
> Ayoub
>
> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski :
>>
>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>>  wrote:
>>
>> > I started playing with the Structured Streaming API in spark 2.0 and I
>> > am
>> > looking for a way to create streaming Dataset/Dataframe from a rest HTTP
>> > endpoint but I am bit stuck.
>>
>> What a great idea! Why did I myself not think about this?!?!
>>
>> > What would be the easiest way to hack around it ? Do I need to implement
>> > the
>> > Datasource API ?
>>
>> Yes and perhaps Hadoop API too, but not sure which one exactly since I
>> haven't even thought about it (not even once).
>>
>> > Are there examples on how to create a DataSource from a REST endpoint ?
>>
>> Never heard of one.
>>
>> I'm hosting a Spark/Scala meetup this week so I'll definitely propose
>> it as a topic. Thanks a lot!
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: sql to spark scala rdd

2016-07-31 Thread Jacek Laskowski
Hi,

Impossible - see
http://www.scala-lang.org/api/current/index.html#scala.collection.Seq@sliding(size:Int,step:Int):Iterator[Repr].

I tried to show you why you ended up with "non-empty iterator" after
println. You should really start with
http://www.scala-lang.org/documentation/

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 31, 2016 at 8:49 PM, sri hari kali charan Tummala
 wrote:
> Tuple
>
> [Lscala.Tuple2;@65e4cb84
>
> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> What's the result type of sliding(2,1)?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>>  wrote:
>> > tried this no luck, wht is non-empty iterator here ?
>> >
>> > OP:-
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> >
>> >
>> > sc.textFile(file).keyBy(x => x.split("\\~") (0))
>> >   .map(x => x._2.split("\\~"))
>> >   .map(x => (x(0),x(2)))
>> > .map { case (key,value) =>
>> > (key,value.toArray.toSeq.sliding(2,1).map(x
>> > => x.sum/x.size))}.foreach(println)
>> >
>> >
>> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
>> >  wrote:
>> >>
>> >> Hi All,
>> >>
>> >> I managed to write using sliding function but can it get key as well in
>> >> my
>> >> output ?
>> >>
>> >> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>> >>   .map(x => x._2.split("\\~"))
>> >>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
>> >> (x,x.size)).foreach(println)
>> >>
>> >>
>> >> at the moment my output:-
>> >>
>> >> 75.0
>> >> -25.0
>> >> 50.0
>> >> -50.0
>> >> -100.0
>> >>
>> >> I want with key how to get moving average output based on key ?
>> >>
>> >>
>> >> 987,75.0
>> >> 987,-25
>> >> 987,50.0
>> >>
>> >> Thanks
>> >> Sri
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
>> >>  wrote:
>> >>>
>> >>> for knowledge just wondering how to write it up in scala or spark RDD.
>> >>>
>> >>> Thanks
>> >>> Sri
>> >>>
>> >>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
>> >>> wrote:
>> 
>>  Why?
>> 
>>  Pozdrawiam,
>>  Jacek Laskowski
>>  
>>  https://medium.com/@jaceklaskowski/
>>  Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>  Follow me at https://twitter.com/jaceklaskowski
>> 
>> 
>>  On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>>   wrote:
>>  > Hi All,
>>  >
>>  > I managed to write business requirement in spark-sql and hive I am
>>  > still
>>  > learning scala how this below sql be written using spark RDD not
>>  > spark
>>  > data
>>  > frames.
>>  >
>>  > SELECT DATE,balance,
>>  > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
>>  > AND
>>  > CURRENT ROW) daily_balance
>>  > FROM  table
>>  >
>>  >
>>  >
>>  >
>>  >
>>  > --
>>  > View this message in context:
>>  >
>>  > http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>>  > Sent from the Apache Spark User List mailing list archive at
>>  > Nabble.com.
>>  >
>>  >
>>  > -
>>  > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>  >
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Thanks & Regards
>> >>> Sri Tummala
>> >>>
>> >>
>> >>
>> >>
>> >> --
>> >> Thanks & Regards
>> >> Sri Tummala
>> >>
>> >
>> >
>> >
>> > --
>> > Thanks & Regards
>> > Sri Tummala
>> >
>
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Java Recipes for Spark

2016-07-31 Thread Jean Georges Perrin
Thanks Guys - I really appreciate :)... If you have any idea of something 
missing, I'll gladly add it.

(and yeah, come on! Is that some kind of primitive racism or what: Java rocks! 
What are those language where you can turn a list to a string and back to an 
object. #StrongTypingRules)

> On Jul 30, 2016, at 12:19 AM, Shiva Ramagopal  wrote:
> 
> +1 for the Java love :-)
> 
> 
> On 30-Jul-2016 4:39 AM, "Renato Perini"  > wrote:
> Not only very useful, but finally some Java love :-)
> 
> Thank you.
> 
> 
> Il 29/07/2016 22:30, Jean Georges Perrin ha scritto:
> Sorry if this looks like a shameless self promotion, but some of you asked me 
> to say when I'll have my Java recipes for Apache Spark updated. It's done 
> here: http://jgp.net/2016/07/22/spark-java-recipes/ 
>  and in the GitHub repo.
> 
> Enjoy / have a great week-end.
> 
> jg
> 
> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 



Re: error while running filter on dataframe

2016-07-31 Thread Michael Armbrust
You are hitting a bug in code generation.  If you can come up with a small
reproduction for the problem.  It would be very helpful if you could open a
JIRA.

On Sun, Jul 31, 2016 at 9:14 AM, Tony Lane  wrote:

> Can someone help me understand this error which occurs while running a
> filter on a dataframe
>
> 2016-07-31 21:01:57 ERROR CodeGenerator:91 - failed to compile:
> org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
> 117, Column 58: Expression "mapelements_isNull" is not an rvalue
> /* 001 */ public Object generate(Object[] references) {
> /* 002 */   return new GeneratedIterator(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ /** Codegened pipeline for:
> /* 006 */ * TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#127L])
> /* 007 */ +- Project
> /* 008 */ +- Filter (is...
> /* 009 */   */
> /* 010 */ final class GeneratedIterator extends
> org.apache.spark.sql.execution.BufferedRowIterator {
> /* 011 */   private Object[] references;
> /* 012 */   private boolean agg_initAgg;
> /* 013 */   private boolean agg_bufIsNull;
> /* 014 */   private long agg_bufValue;
> /* 015 */   private scala.collection.Iterator inputadapter_input;
> /* 016 */   private Object[] deserializetoobject_values;
> /* 017 */   private org.apache.spark.sql.types.StructType
> deserializetoobject_schema;
> /* 018 */   private UnsafeRow deserializetoobject_result;
> /* 019 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> deserializetoobject_holder;
> /* 020 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> deserializetoobject_rowWriter;
> /* 021 */   private UnsafeRow mapelements_result;
> /* 022 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> mapelements_holder;
> /* 023 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> mapelements_rowWriter;
> /* 024 */   private Object[] serializefromobject_values;
> /* 025 */   private UnsafeRow serializefromobject_result;
> /* 026 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> serializefromobject_holder;
> /* 027 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter;
> /* 028 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> serializefromobject_rowWriter1;
> /* 029 */   private org.apache.spark.sql.execution.metric.SQLMetric
> filter_numOutputRows;
> /* 030 */   private UnsafeRow filter_result;
> /* 031 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
> filter_holder;
> /* 032 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter;
> /* 033 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> filter_rowWriter1;
> /* 034 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_numOutputRows;
> /* 035 */   private org.apache.spark.sql.execution.metric.SQLMetric
> agg_aggTime;
> /* 036 */   private UnsafeRow agg_result;
> /* 037 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
> /* 038 */   private
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
> agg_rowWriter;
> /* 039 */
> /* 040 */   public GeneratedIterator(Object[] references) {
> /* 041 */ this.references = references;
> /* 042 */   }
> /* 043 */
>
>


Re: calling dataset.show on a custom object - displays toString() value as first column and blank for rest

2016-07-31 Thread Michael Armbrust
Can you share you code?  This does not happen for me

.

On Sun, Jul 31, 2016 at 7:16 AM, Rohit Chaddha 
wrote:

> I have a custom object called A and corresponding Dataset
>
> when I call datasetA.show() method i get the following
>
> +++-+-+---+
> |id|da|like|values|uid|
> +++-+-+---+
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
> |A.toString()...|
>
> that is A.toString() is called and displayed as value of the first column
> and rest all columns are blank
>
> Any suggestions what should be done to fix this ?
>
> - Rohit
>


Re: spark 2.0 readStream from a REST API

2016-07-31 Thread Michael Armbrust
You have to add a file in resource too (example
).
Either that or give a full class name.

On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali 
wrote:

> Looks like the way to go in spark 2.0 is to implement StreamSourceProvider
> 
>  with DataSourceRegister
> .
> But now spark fails at loading the class when doing:
>
> spark.readStream.format("mysource").load()
>
> I get :
>
> java.lang.ClassNotFoundException: Failed to find data source: mysource.
> Please find packages at http://spark-packages.org
>
> Is there something I need to do in order to "load" the Stream source
> provider ?
>
> Thanks,
> Ayoub
>
> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski :
>
>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>>  wrote:
>>
>> > I started playing with the Structured Streaming API in spark 2.0 and I
>> am
>> > looking for a way to create streaming Dataset/Dataframe from a rest HTTP
>> > endpoint but I am bit stuck.
>>
>> What a great idea! Why did I myself not think about this?!?!
>>
>> > What would be the easiest way to hack around it ? Do I need to
>> implement the
>> > Datasource API ?
>>
>> Yes and perhaps Hadoop API too, but not sure which one exactly since I
>> haven't even thought about it (not even once).
>>
>> > Are there examples on how to create a DataSource from a REST endpoint ?
>>
>> Never heard of one.
>>
>> I'm hosting a Spark/Scala meetup this week so I'll definitely propose
>> it as a topic. Thanks a lot!
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>
>


Re: [Spark 2.0] Why MutableInt cannot be cast to MutableLong?

2016-07-31 Thread Michael Armbrust
Are you sure you are running Spark 2.0?

In your stack trace I see SqlNewHadoopRDD, which was removed in #12354
.

On Sun, Jul 31, 2016 at 2:12 AM, Chanh Le  wrote:

> Hi everyone,
> Why *MutableInt* cannot be cast to *MutableLong?*
> It’s really weird and seems Spark 2.0 has a lot of error with parquet
> about format.
>
> *org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to
> org.apache.spark.sql.catalyst.expressions.MutableL ong*
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 0 in block 0 in file
> file:/data/etl-report/parquet/AD_COOKIE_REPORT/time=2016-07-
>
> 25-16/network_id=31713/part-r-0-9adbef89-f2f4-4836-a50c-a2e7b381d558.snappy.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
> at
> org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
> at
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassCastException:
> org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to
> org.apache.spark.sql.catalyst.expressions.MutableL
> ong
> at
> org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setLong(SpecificMutableRow.scala:295)
> at
> org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter$RowUpdater.setLong(CatalystRowConverter.scala:161)
> at
> org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter.addLong(CatalystRowConverter.scala:85)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:269)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:365)
> at
> org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
> ... 20 more
>


Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
val test=sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => ((x(0),x(1),x(2
  .map{case (account,datevalue,amount) =>
((account,datevalue),(amount.toDouble))}.mapValues(x =>
x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ +
_._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println)


On Sun, Jul 31, 2016 at 12:15 PM, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> I already solved it using DF and spark sql I was wondering how to solve in
> scala rdd, I just got the answer need to check my results compared to spark
> sql thanks all for your time.
>
> I am trying to solve moving average using scala RDD group by key.
>
>
> input:-
> -987~20150728~100
> -987~20150729~50
> -987~20150730~-100
> -987~20150804~200
> -987~20150807~-300
> -987~20150916~100
>
>
> val test=sc.textFile(file).keyBy(x => x.split("\\~") (0))
>   .map(x => x._2.split("\\~"))
>   .map(x => ((x(0),x(1),x(2
>   .map{case (account,datevalue,amount) => 
> ((account,datevalue),(amount.toDouble))}.mapValues(x => 
> x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ + 
> _._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println)
>
> Op:-
>
> accountkey,date,balance_of_account, daily_average, sum_base_on_window
>
> ((-987,20150728),50.0,75.0,150.0)
> ((-987,20150729),-100.0,-25.0,-50.0)
> ((-987,20150730),200.0,50.0,100.0)
> ((-987,20150804),-300.0,-50.0,-100.0)
> ((-987,20150807),100.0,-100.0,-200.0)
>
>
> below book is written for Hadoop Mapreduce the book has solution for
> moving average but its in Java.
>
>
> https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch06.html
>
>
> Sql:-
>
>
> SELECT DATE,balance,
> SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
> CURRENT ROW) daily_balance
> FROM  table
>
> Thanks
> Sri
>
>
>
> On Sun, Jul 31, 2016 at 11:54 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Check also this
>> 
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 31 July 2016 at 19:49, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Tuple
>>>
>>> [Lscala.Tuple2;@65e4cb84
>>>
>>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski 
>>> wrote:
>>>
 Hi,

 What's the result type of sliding(2,1)?

 Pozdrawiam,
 Jacek Laskowski
 
 https://medium.com/@jaceklaskowski/
 Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
 Follow me at https://twitter.com/jaceklaskowski


 On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
  wrote:
 > tried this no luck, wht is non-empty iterator here ?
 >
 > OP:-
 > (-987,non-empty iterator)
 > (-987,non-empty iterator)
 > (-987,non-empty iterator)
 > (-987,non-empty iterator)
 > (-987,non-empty iterator)
 >
 >
 > sc.textFile(file).keyBy(x => x.split("\\~") (0))
 >   .map(x => x._2.split("\\~"))
 >   .map(x => (x(0),x(2)))
 > .map { case (key,value) =>
 (key,value.toArray.toSeq.sliding(2,1).map(x
 > => x.sum/x.size))}.foreach(println)
 >
 >
 > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
 >  wrote:
 >>
 >> Hi All,
 >>
 >> I managed to write using sliding function but can it get key as well
 in my
 >> output ?
 >>
 >> sc.textFile(file).keyBy(x => x.split("\\~") (0))
 >>   .map(x => x._2.split("\\~"))
 >>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
 >> (x,x.size)).foreach(println)
 >>
 >>
 >> at the moment my output:-
 >>
 >> 75.0
 >> -25.0
 >> 50.0
 >> -50.0
 >> -100.0
 >>
 >> I want with key how to get moving average output based on key ?
 >>
 >>
 >> 987,75.0
 >> 987,-25
 >> 987,50.0
 >>
 >> Thanks
 >> Sri
 >>
 >>
 >>
 >>
 >>
 >>
 >> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
 >>  wrote:
 >>>
 >>> for knowledge just wondering how to write it up in scala or spark
 RDD.
 >>>
 >>> Thanks
 >>> Sri
 >>>
 >>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 

Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
Hi All,

I already solved it using DF and spark sql I was wondering how to solve in
scala rdd, I just got the answer need to check my results compared to spark
sql thanks all for your time.

I am trying to solve moving average using scala RDD group by key.


input:-
-987~20150728~100
-987~20150729~50
-987~20150730~-100
-987~20150804~200
-987~20150807~-300
-987~20150916~100


val test=sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => ((x(0),x(1),x(2
  .map{case (account,datevalue,amount) =>
((account,datevalue),(amount.toDouble))}.mapValues(x =>
x).toArray.sliding(2,1).map(x => (x(0)._1,x(1)._2,(x.foldLeft(0.0)(_ +
_._2/x.size)),x.foldLeft(0.0)(_ + _._2))).foreach(println)

Op:-

accountkey,date,balance_of_account, daily_average, sum_base_on_window

((-987,20150728),50.0,75.0,150.0)
((-987,20150729),-100.0,-25.0,-50.0)
((-987,20150730),200.0,50.0,100.0)
((-987,20150804),-300.0,-50.0,-100.0)
((-987,20150807),100.0,-100.0,-200.0)


below book is written for Hadoop Mapreduce the book has solution for moving
average but its in Java.

https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch06.html


Sql:-


SELECT DATE,balance,
SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW) daily_balance
FROM  table

Thanks
Sri



On Sun, Jul 31, 2016 at 11:54 AM, Mich Talebzadeh  wrote:

> Check also this
> 
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 19:49, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Tuple
>>
>> [Lscala.Tuple2;@65e4cb84
>>
>> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski  wrote:
>>
>>> Hi,
>>>
>>> What's the result type of sliding(2,1)?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>>>  wrote:
>>> > tried this no luck, wht is non-empty iterator here ?
>>> >
>>> > OP:-
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> > (-987,non-empty iterator)
>>> >
>>> >
>>> > sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>> >   .map(x => x._2.split("\\~"))
>>> >   .map(x => (x(0),x(2)))
>>> > .map { case (key,value) =>
>>> (key,value.toArray.toSeq.sliding(2,1).map(x
>>> > => x.sum/x.size))}.foreach(println)
>>> >
>>> >
>>> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
>>> >  wrote:
>>> >>
>>> >> Hi All,
>>> >>
>>> >> I managed to write using sliding function but can it get key as well
>>> in my
>>> >> output ?
>>> >>
>>> >> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>> >>   .map(x => x._2.split("\\~"))
>>> >>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
>>> >> (x,x.size)).foreach(println)
>>> >>
>>> >>
>>> >> at the moment my output:-
>>> >>
>>> >> 75.0
>>> >> -25.0
>>> >> 50.0
>>> >> -50.0
>>> >> -100.0
>>> >>
>>> >> I want with key how to get moving average output based on key ?
>>> >>
>>> >>
>>> >> 987,75.0
>>> >> 987,-25
>>> >> 987,50.0
>>> >>
>>> >> Thanks
>>> >> Sri
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
>>> >>  wrote:
>>> >>>
>>> >>> for knowledge just wondering how to write it up in scala or spark
>>> RDD.
>>> >>>
>>> >>> Thanks
>>> >>> Sri
>>> >>>
>>> >>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
>>> >>> wrote:
>>> 
>>>  Why?
>>> 
>>>  Pozdrawiam,
>>>  Jacek Laskowski
>>>  
>>>  https://medium.com/@jaceklaskowski/
>>>  Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>>  Follow me at https://twitter.com/jaceklaskowski
>>> 
>>> 
>>>  On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>>>   wrote:
>>>  > Hi All,
>>>  >
>>>  > I managed to write business requirement in spark-sql and hive I am
>>>  > still
>>>  > learning scala how this below sql be written using spark RDD not
>>> spark
>>>  > data
>>>  > frames.

Re: multiple spark streaming contexts

2016-07-31 Thread Sumit Khanna
Any ideas on this one guys ?

I can do a sample run but can't be sure of imminent problems if any? How
can I ensure different batchDuration etc etc in here, per StreamingContext.

Thanks,

On Sun, Jul 31, 2016 at 10:50 AM, Sumit Khanna 
wrote:

> Hey,
>
> Was wondering if I could create multiple spark stream contexts in my
> application (e.g instantiating a worker actor per topic and it has its own
> streaming context its own batch duration everything).
>
> What are the caveats if any?
> What are the best practices?
>
> Have googled half heartedly on the same but the air isn't pretty much
> demystified yet. I could skim through something like
>
>
> http://stackoverflow.com/questions/29612726/how-do-you-setup-multiple-spark-streaming-jobs-with-different-batch-durations
>
>
> http://stackoverflow.com/questions/37006565/multiple-spark-streaming-contexts-on-one-worker
>
> Thanks in Advance!
> Sumit
>


Re: sql to spark scala rdd

2016-07-31 Thread Mich Talebzadeh
Check also this


HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 19:49, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Tuple
>
> [Lscala.Tuple2;@65e4cb84
>
> On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> What's the result type of sliding(2,1)?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>>  wrote:
>> > tried this no luck, wht is non-empty iterator here ?
>> >
>> > OP:-
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> > (-987,non-empty iterator)
>> >
>> >
>> > sc.textFile(file).keyBy(x => x.split("\\~") (0))
>> >   .map(x => x._2.split("\\~"))
>> >   .map(x => (x(0),x(2)))
>> > .map { case (key,value) =>
>> (key,value.toArray.toSeq.sliding(2,1).map(x
>> > => x.sum/x.size))}.foreach(println)
>> >
>> >
>> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
>> >  wrote:
>> >>
>> >> Hi All,
>> >>
>> >> I managed to write using sliding function but can it get key as well
>> in my
>> >> output ?
>> >>
>> >> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>> >>   .map(x => x._2.split("\\~"))
>> >>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
>> >> (x,x.size)).foreach(println)
>> >>
>> >>
>> >> at the moment my output:-
>> >>
>> >> 75.0
>> >> -25.0
>> >> 50.0
>> >> -50.0
>> >> -100.0
>> >>
>> >> I want with key how to get moving average output based on key ?
>> >>
>> >>
>> >> 987,75.0
>> >> 987,-25
>> >> 987,50.0
>> >>
>> >> Thanks
>> >> Sri
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
>> >>  wrote:
>> >>>
>> >>> for knowledge just wondering how to write it up in scala or spark RDD.
>> >>>
>> >>> Thanks
>> >>> Sri
>> >>>
>> >>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
>> >>> wrote:
>> 
>>  Why?
>> 
>>  Pozdrawiam,
>>  Jacek Laskowski
>>  
>>  https://medium.com/@jaceklaskowski/
>>  Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>  Follow me at https://twitter.com/jaceklaskowski
>> 
>> 
>>  On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>>   wrote:
>>  > Hi All,
>>  >
>>  > I managed to write business requirement in spark-sql and hive I am
>>  > still
>>  > learning scala how this below sql be written using spark RDD not
>> spark
>>  > data
>>  > frames.
>>  >
>>  > SELECT DATE,balance,
>>  > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
>> AND
>>  > CURRENT ROW) daily_balance
>>  > FROM  table
>>  >
>>  >
>>  >
>>  >
>>  >
>>  > --
>>  > View this message in context:
>>  >
>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>>  > Sent from the Apache Spark User List mailing list archive at
>>  > Nabble.com.
>>  >
>>  >
>> -
>>  > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>  >
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Thanks & Regards
>> >>> Sri Tummala
>> >>>
>> >>
>> >>
>> >>
>> >> --
>> >> Thanks & Regards
>> >> Sri Tummala
>> >>
>> >
>> >
>> >
>> > --
>> > Thanks & Regards
>> > Sri Tummala
>> >
>>
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
Tuple

[Lscala.Tuple2;@65e4cb84

On Sun, Jul 31, 2016 at 1:00 AM, Jacek Laskowski  wrote:

> Hi,
>
> What's the result type of sliding(2,1)?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
>  wrote:
> > tried this no luck, wht is non-empty iterator here ?
> >
> > OP:-
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> > (-987,non-empty iterator)
> >
> >
> > sc.textFile(file).keyBy(x => x.split("\\~") (0))
> >   .map(x => x._2.split("\\~"))
> >   .map(x => (x(0),x(2)))
> > .map { case (key,value) =>
> (key,value.toArray.toSeq.sliding(2,1).map(x
> > => x.sum/x.size))}.foreach(println)
> >
> >
> > On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
> >  wrote:
> >>
> >> Hi All,
> >>
> >> I managed to write using sliding function but can it get key as well in
> my
> >> output ?
> >>
> >> sc.textFile(file).keyBy(x => x.split("\\~") (0))
> >>   .map(x => x._2.split("\\~"))
> >>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
> >> (x,x.size)).foreach(println)
> >>
> >>
> >> at the moment my output:-
> >>
> >> 75.0
> >> -25.0
> >> 50.0
> >> -50.0
> >> -100.0
> >>
> >> I want with key how to get moving average output based on key ?
> >>
> >>
> >> 987,75.0
> >> 987,-25
> >> 987,50.0
> >>
> >> Thanks
> >> Sri
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
> >>  wrote:
> >>>
> >>> for knowledge just wondering how to write it up in scala or spark RDD.
> >>>
> >>> Thanks
> >>> Sri
> >>>
> >>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
> >>> wrote:
> 
>  Why?
> 
>  Pozdrawiam,
>  Jacek Laskowski
>  
>  https://medium.com/@jaceklaskowski/
>  Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>  Follow me at https://twitter.com/jaceklaskowski
> 
> 
>  On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>   wrote:
>  > Hi All,
>  >
>  > I managed to write business requirement in spark-sql and hive I am
>  > still
>  > learning scala how this below sql be written using spark RDD not
> spark
>  > data
>  > frames.
>  >
>  > SELECT DATE,balance,
>  > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING
> AND
>  > CURRENT ROW) daily_balance
>  > FROM  table
>  >
>  >
>  >
>  >
>  >
>  > --
>  > View this message in context:
>  >
> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>  > Sent from the Apache Spark User List mailing list archive at
>  > Nabble.com.
>  >
>  >
> -
>  > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>  >
> >>>
> >>>
> >>>
> >>>
> >>> --
> >>> Thanks & Regards
> >>> Sri Tummala
> >>>
> >>
> >>
> >>
> >> --
> >> Thanks & Regards
> >> Sri Tummala
> >>
> >
> >
> >
> > --
> > Thanks & Regards
> > Sri Tummala
> >
>



-- 
Thanks & Regards
Sri Tummala


Re: spark-submit hangs forever after all tasks finish(spark 2.0.0 stable version on yarn)

2016-07-31 Thread Zhuo Tao
Yarn client

On Sunday, July 31, 2016, Pradeep  wrote:

> Hi,
>
> Are you running on yarn-client or cluster mode?
>
> Pradeep
>
> > On Jul 30, 2016, at 7:34 PM, taozhuo >
> wrote:
> >
> > below is the error messages that seem run infinitely:
> >
> >
> > 16/07/30 23:25:38 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147247
> > 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147247
> > 16/07/30 23:25:39 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147248
> > 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147248
> > 16/07/30 23:25:40 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147249
> > 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147249
> > 16/07/30 23:25:41 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147250
> > 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147250
> > 16/07/30 23:25:42 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147251
> > 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147251
> > 16/07/30 23:25:43 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147252
> > 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147252
> > 16/07/30 23:25:44 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 0ms
> > 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147253
> > 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147253
> > 16/07/30 23:25:45 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 0ms
> > 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147254
> > 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147254
> > 16/07/30 23:25:46 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147255
> > 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147255
> > 16/07/30 23:25:47 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> > 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao sending #147256
> > 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to
> > /10.80.1.168:8032 from zhuotao got value #147256
> > 16/07/30 23:25:48 DEBUG ProtobufRpcEngine: Call: getApplicationReport
> took
> > 1ms
> >
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-hangs-forever-after-all-tasks-finish-spark-2-0-0-stable-version-on-yarn-tp27436.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> >
>
>


build error - failing test- Error while building spark 2.0 trunk from github

2016-07-31 Thread Rohit Chaddha
---
 T E S T S
---
Running org.apache.spark.api.java.OptionalSuite
Tests run: 8, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.052 sec -
in org.apache.spark.api.java.OptionalSuite
Running org.apache.spark.JavaAPISuite
Tests run: 90, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 23.537 sec
<<< FAILURE! - in org.apache.spark.JavaAPISuite
wholeTextFiles(org.apache.spark.JavaAPISuite)  Time elapsed: 0.331 sec  <<<
FAILURE!
java.lang.AssertionError:
expected: but was:
at
org.apache.spark.JavaAPISuite.wholeTextFiles(JavaAPISuite.java:1087)

Running org.apache.spark.JavaJdbcRDDSuite
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.799 sec -
in org.apache.spark.JavaJdbcRDDSuite
Running org.apache.spark.launcher.SparkLauncherSuite
Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.04 sec
<<< FAILURE! - in org.apache.spark.launcher.SparkLauncherSuite
testChildProcLauncher(org.apache.spark.launcher.SparkLauncherSuite)  Time
elapsed: 0.03 sec  <<< FAILURE!
java.lang.AssertionError: expected:<0> but was:<1>
at
org.apache.spark.launcher.SparkLauncherSuite.testChildProcLauncher(SparkLauncherSuite.java:110)

Running org.apache.spark.memory.TaskMemoryManagerSuite
Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.011 sec -
in org.apache.spark.memory.TaskMemoryManagerSuite
Running org.apache.spark.shuffle.sort.PackedRecordPointerSuite
Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.004 sec -
in org.apache.spark.shuffle.sort.PackedRecordPointerSuite
Running org.apache.spark.shuffle.sort.ShuffleInMemoryRadixSorterSuite
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.103 sec -
in org.apache.spark.shuffle.sort.ShuffleInMemoryRadixSorterSuite
Running org.apache.spark.shuffle.sort.ShuffleInMemorySorterSuite
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.199 sec -
in org.apache.spark.shuffle.sort.ShuffleInMemorySorterSuite
Running org.apache.spark.shuffle.sort.UnsafeShuffleWriterSuite
Tests run: 20, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.67 sec -
in org.apache.spark.shuffle.sort.UnsafeShuffleWriterSuite
Running org.apache.spark.unsafe.map.BytesToBytesMapOffHeapSuite
Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.97 sec -
in org.apache.spark.unsafe.map.BytesToBytesMapOffHeapSuite
Running org.apache.spark.unsafe.map.BytesToBytesMapOnHeapSuite
Tests run: 13, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.583 sec
- in org.apache.spark.unsafe.map.BytesToBytesMapOnHeapSuite
Running
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterRadixSortSuite
Tests run: 10, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.533 sec
- in
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterRadixSortSuite
Running
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite
Tests run: 10, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.606 sec
- in org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite
Running
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterRadixSortSuite
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.007 sec -
in
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterRadixSortSuite
Running
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterSuite
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.001 sec -
in org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorterSuite
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
MaxPermSize=512m; support was removed in 8.0

Results :

Failed tests:
  JavaAPISuite.wholeTextFiles:1087 expected: but was:
  SparkLauncherSuite.testChildProcLauncher:110 expected:<0> but was:<1>

Tests run: 189, Failures: 2, Errors: 0, Skipped: 0


Re: spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Looks like the way to go in spark 2.0 is to implement StreamSourceProvider

 with DataSourceRegister
.
But now spark fails at loading the class when doing:

spark.readStream.format("mysource").load()

I get :

java.lang.ClassNotFoundException: Failed to find data source: mysource.
Please find packages at http://spark-packages.org

Is there something I need to do in order to "load" the Stream source
provider ?

Thanks,
Ayoub

2016-07-31 17:19 GMT+02:00 Jacek Laskowski :

> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>  wrote:
>
> > I started playing with the Structured Streaming API in spark 2.0 and I am
> > looking for a way to create streaming Dataset/Dataframe from a rest HTTP
> > endpoint but I am bit stuck.
>
> What a great idea! Why did I myself not think about this?!?!
>
> > What would be the easiest way to hack around it ? Do I need to implement
> the
> > Datasource API ?
>
> Yes and perhaps Hadoop API too, but not sure which one exactly since I
> haven't even thought about it (not even once).
>
> > Are there examples on how to create a DataSource from a REST endpoint ?
>
> Never heard of one.
>
> I'm hosting a Spark/Scala meetup this week so I'll definitely propose
> it as a topic. Thanks a lot!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>


error while running filter on dataframe

2016-07-31 Thread Tony Lane
Can someone help me understand this error which occurs while running a
filter on a dataframe

2016-07-31 21:01:57 ERROR CodeGenerator:91 - failed to compile:
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line
117, Column 58: Expression "mapelements_isNull" is not an rvalue
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ /** Codegened pipeline for:
/* 006 */ * TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#127L])
/* 007 */ +- Project
/* 008 */ +- Filter (is...
/* 009 */   */
/* 010 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 011 */   private Object[] references;
/* 012 */   private boolean agg_initAgg;
/* 013 */   private boolean agg_bufIsNull;
/* 014 */   private long agg_bufValue;
/* 015 */   private scala.collection.Iterator inputadapter_input;
/* 016 */   private Object[] deserializetoobject_values;
/* 017 */   private org.apache.spark.sql.types.StructType
deserializetoobject_schema;
/* 018 */   private UnsafeRow deserializetoobject_result;
/* 019 */   private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
deserializetoobject_holder;
/* 020 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
deserializetoobject_rowWriter;
/* 021 */   private UnsafeRow mapelements_result;
/* 022 */   private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
mapelements_holder;
/* 023 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
mapelements_rowWriter;
/* 024 */   private Object[] serializefromobject_values;
/* 025 */   private UnsafeRow serializefromobject_result;
/* 026 */   private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
serializefromobject_holder;
/* 027 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
serializefromobject_rowWriter;
/* 028 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
serializefromobject_rowWriter1;
/* 029 */   private org.apache.spark.sql.execution.metric.SQLMetric
filter_numOutputRows;
/* 030 */   private UnsafeRow filter_result;
/* 031 */   private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
filter_holder;
/* 032 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
filter_rowWriter;
/* 033 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
filter_rowWriter1;
/* 034 */   private org.apache.spark.sql.execution.metric.SQLMetric
agg_numOutputRows;
/* 035 */   private org.apache.spark.sql.execution.metric.SQLMetric
agg_aggTime;
/* 036 */   private UnsafeRow agg_result;
/* 037 */   private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder agg_holder;
/* 038 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
agg_rowWriter;
/* 039 */
/* 040 */   public GeneratedIterator(Object[] references) {
/* 041 */ this.references = references;
/* 042 */   }
/* 043 */


Spark Sql - Losing connection with Hive Metastore

2016-07-31 Thread KhajaAsmath Mohammed
Hi,

I am working on application to move the data from multiple hive tables into
single external hive table using joins.

Spark sql is able to insert data into the table but it is losing connection
to metastore after inserting data. I still have 3 more queries to be
executed and insert data into other hive external tables. Did anyone come
across this issue?

*Logs:*

16/07/31 15:53:58 WARN ExecutorAllocationManager: Unable to reach the
cluster manager to request 2 total executors!
16/07/31 15:53:59 WARN SparkContext: Requesting executors is only supported
in coarse-grained mode
*16/07/31 15:54:00 WARN RetryingMetaStoreClient: MetaStoreClient lost
connection. Attempting to reconnect.*
*org.apache.thrift.TApplicationException: Invalid method name:
'alter_table_with_cascade'*
*at
org.apache.thrift.TApplicationException.read(TApplicationException.java:111)*

Above log is displayed after inserting data into one table and the job is
abborted.

Query:

hiveContext.sql("insert into daastest.analytics from select query from
other tables using joins");


Thanks,
Asmath.


Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
Many thanks

This is a challenge of some sort.

I did this for my own work.

I downloaded my bank account for the past few years as a CSV format and
loaded into Hive ORC table with databricks stuff.

All tables are transactional and bucketed.

A typical row looks like this (they vary from bank to bank)

hive> desc ll_18740868;
transactiondate date
transactiontype string
sortcodestring
accountnumber   string
transactiondescription  string
debitamount double
creditamountdouble
balance double

The columns I am interested are

transactiondate date
transactiontype string
transactiondescription  string
debitamount double

Basically paying by Debit Card means I need to look at transactiontype =
'DEB'. Actually I download all their codes (those three letters
abbreviation like

1   BGC Bank Giro Credit
2   BNS Bonus
12  DEB Debit Card

etc

My biggest challenge has been to decode transactiondescription column as
for each payee it uses different description. For example

SELECT DISTINCT transactiondescription FROM  ll_18740868 ORDER BY
substring(transactiondescription,1,INSTR(transactiondescription,'CD')-2);

SAINSBURY'S S/MKT CD 5710
SAINSBURY'S S/MKT CD 4610
SAINSBURY'S S/MKT CD 5916
SAINSBURY'S S/MKT CD 4628
SAINSBURY'S S/MKT CD 4636
SAINSBURY'S SMKT CD 5710
SAINSBURY'S SMKT CD 4628
SAINSBURYS CD 5916
SAINSBURYS CD 5710
SAINSBURYS S/MKT CD 5710
SAINSBURYS S/MKTS CD 5916
SAINSBURYS S/MKTS CD 4628
SAINSBURYS S/MKTS CD 4636
SAINSBURYS SMKT CD 5710
SALISBURY CATHEDRA CD 5710

If I look at the description I can see that they all belong to a grocer so
if I sun them up as below I will know how much I spent in total for each
payee sort of) by taking those substring up to -2 characters before CD

CREATE TEMPORARY TABLE tmp (hashtag String, Spent float);
INSERT INTO tmp
SELECT DISTINCT *
FROM (SELECT
SUBSTRING(transactiondescription,1,INSTR(transactiondescription,'CD')-2),
SUM(debitamount) OVER (PARTITION BY
substring(transactiondescription,1,INSTR(transactiondescription,'CD')-2)) r
FROM accounts.ll_18740868 WHERE transactiontype = 'DEB'
) RS
;
SELECT * FROM tmp order by spent,hashtag;


This is crude. What I really want is to dig in into all
transactiondescriptions which belong to the same payee under full text.
Like above they are all Sainsbury  whatever they describe :)

It is a type of sentiment analysis for myself so I know where I am wasting
my money most :)

Cheers













Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 13:37, ayan guha  wrote:

> Hi
>
> here is a quick setup (Based on airlines.txt dataset):
>
>
> --
> *from datetime import datetime, timedelta*
> *from pyspark.sql.types import **
> *from pyspark.sql.functions import udf, col,rank,min*
> *from pyspark import SparkContext, HiveContext*
> *import sys*
> *from pyspark.sql import Window*
>
>
> *sc = SparkContext()*
> *hc = HiveContext(sc)*
> *customSchema = StructType([ \*
> *StructField("airport_id", IntegerType(), True) , \*
> *StructField("name", StringType(), True) , \*
> *StructField("city", StringType(), True) , \*
> *StructField("country", StringType(), True) , \*
> *StructField("iata", StringType(), True) , \*
> *StructField("icao", StringType(), True) , \*
> *StructField("latitude", DecimalType(precision=20,scale=10), True) , \*
> *StructField("longitude",DecimalType(precision=20,scale=10), True) , \*
> *StructField("altitude", IntegerType(), True) , \*
> *StructField("timezone", DoubleType(), True) , \*
> *StructField("dst", StringType(), True) , \*
> *StructField("tz_name", StringType(), True)*
> *])*
>
> *inFile = sys.argv[1]*
>
> *df1 = df =
> hc.read.format('com.databricks.spark.csv').options(header='false',
> inferschema='true').load(inFile,schema=customSchema)*
>
>
> *df1.registerTempTable("airlines")*
> *df2 = hc.sql("select airport_id,altitude,r from (select *,rank() over
> (order by altitude desc) r from airlines where altitude>100) rs where r=1")*
> *print df2.take(10)*
>
> *w = Window.orderBy(df['altitude'].desc())*
>
> *df3 = df1.filter(df1.altitude >
> 100).select(df1.airport_id,df1.altitude,rank().over(w).alias("r")).filter("r=1")*
> *print df3.take(10)*
>
> *sc.stop()*
>
> 

calling dataset.show on a custom object - displays toString() value as first column and blank for rest

2016-07-31 Thread Rohit Chaddha
I have a custom object called A and corresponding Dataset

when I call datasetA.show() method i get the following

+++-+-+---+
|id|da|like|values|uid|
+++-+-+---+
|A.toString()...|
|A.toString()...|
|A.toString()...|
|A.toString()...|
|A.toString()...|
|A.toString()...|

that is A.toString() is called and displayed as value of the first column
and rest all columns are blank

Any suggestions what should be done to fix this ?

- Rohit


spark java - convert string to date

2016-07-31 Thread Tony Lane
Any built in function in java with spark to convert string to date more
efficiently
or do we just use the standard java techniques

-Tony


Re: spark-submit hangs forever after all tasks finish(spark 2.0.0 stable version on yarn)

2016-07-31 Thread Pradeep
Hi,

Are you running on yarn-client or cluster mode?

Pradeep

> On Jul 30, 2016, at 7:34 PM, taozhuo  wrote:
> 
> below is the error messages that seem run infinitely:
> 
> 
> 16/07/30 23:25:38 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147247
> 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147247
> 16/07/30 23:25:39 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147248
> 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147248
> 16/07/30 23:25:40 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147249
> 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147249
> 16/07/30 23:25:41 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147250
> 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147250
> 16/07/30 23:25:42 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147251
> 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147251
> 16/07/30 23:25:43 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147252
> 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147252
> 16/07/30 23:25:44 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 0ms
> 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147253
> 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147253
> 16/07/30 23:25:45 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 0ms
> 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147254
> 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147254
> 16/07/30 23:25:46 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147255
> 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147255
> 16/07/30 23:25:47 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147256
> 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147256
> 16/07/30 23:25:48 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-hangs-forever-after-all-tasks-finish-spark-2-0-0-stable-version-on-yarn-tp27436.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark R 2.0 dapply very slow

2016-07-31 Thread Yann-Aël Le Borgne
Hello all,

I just started testing Spark R 2.0, and find the execution of dapply very
slow.

For example, using R, the following code

set.seed(2)
random_DF<-data.frame(matrix(rnorm(100),10,10))
system.time(dummy_res<-random_DF[random_DF[,1]>1,])
   user  system elapsed
  0.005   0.000   0.006

is executed in 6ms

Now, if I create a Spark DF on 4 partition, and run on 4 cores, I get:

sparkR.session(master = "local[4]")

  random_DF_Spark <- repartition(createDataFrame(random_DF),4)

  subset_DF_Spark <- dapply(
random_DF_Spark,
function(x) {
  y <- x[x[1] > 1, ]
  y
},
schema(random_DF_Spark))

  system.time(dummy_res_Spark<-collect(subset_DF_Spark))
user  system elapsed
  2.003   0.119  62.919

I.e. 1 minute, which is abnormally slow Am I missing something?

I get also a warning (16/07/31 15:07:02 WARN TaskSetManager: Stage 64
contains a task of very large size (16411 KB). The maximum recommended task
size is 100 KB.). Why is this 100KB limit so low?

I am using R 3.3.0 on Mac OS 10.10.5

Any insight welcome,
Best,
Yann-Aël

-- 
=
Yann-Aël Le Borgne
Machine Learning Group
Université Libre de Bruxelles

http://mlg.ulb.ac.be
http://www.ulb.ac.be/di/map/yleborgn
=


Re: How to filter based on a constant value

2016-07-31 Thread ayan guha
Hi

here is a quick setup (Based on airlines.txt dataset):

--
*from datetime import datetime, timedelta*
*from pyspark.sql.types import **
*from pyspark.sql.functions import udf, col,rank,min*
*from pyspark import SparkContext, HiveContext*
*import sys*
*from pyspark.sql import Window*


*sc = SparkContext()*
*hc = HiveContext(sc)*
*customSchema = StructType([ \*
*StructField("airport_id", IntegerType(), True) , \*
*StructField("name", StringType(), True) , \*
*StructField("city", StringType(), True) , \*
*StructField("country", StringType(), True) , \*
*StructField("iata", StringType(), True) , \*
*StructField("icao", StringType(), True) , \*
*StructField("latitude", DecimalType(precision=20,scale=10), True) , \*
*StructField("longitude",DecimalType(precision=20,scale=10), True) , \*
*StructField("altitude", IntegerType(), True) , \*
*StructField("timezone", DoubleType(), True) , \*
*StructField("dst", StringType(), True) , \*
*StructField("tz_name", StringType(), True)*
*])*

*inFile = sys.argv[1]*

*df1 = df =
hc.read.format('com.databricks.spark.csv').options(header='false',
inferschema='true').load(inFile,schema=customSchema)*


*df1.registerTempTable("airlines")*
*df2 = hc.sql("select airport_id,altitude,r from (select *,rank() over
(order by altitude desc) r from airlines where altitude>100) rs where r=1")*
*print df2.take(10)*

*w = Window.orderBy(df['altitude'].desc())*

*df3 = df1.filter(df1.altitude >
100).select(df1.airport_id,df1.altitude,rank().over(w).alias("r")).filter("r=1")*
*print df3.take(10)*

*sc.stop()*
--

Here
is
an awesome blog from Databricks.

HTH

Ayan



On Sun, Jul 31, 2016 at 8:58 PM, Mich Talebzadeh 
wrote:

> It is true that whatever an analytic function does can be done by standard
> SQL, with join and sub-queries. But the same routine done by analytic
> function is always faster, or at least as fast, when compared to standard
> SQL.
>
> I will try to see if I can do analytic functions with Spark FP on Data
> Frames. It is essentially replacing the base table with DF and using JAVA
> functions instead of SQL ones on top
>
> Also some text based search functions say LIKE in SQL can be replaced with
> CONTAINS in FP.
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 10:56, ayan guha  wrote:
>
>> The point is, window functions are supposed designed to be faster by
>> doing the calculations in one pass, instead of 2 pass in case of max.
>>
>> DF supports window functions (using sql.Window) so instead of writing
>> sql, you can use it as well.
>>
>> Best
>> Ayan
>>
>> On Sun, Jul 31, 2016 at 7:48 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> yes reserved word issue thanks
>>>
>>> hive> select *
>>> > from (select transactiondate, transactiondescription, debitamount
>>> > , rank() over (order by transactiondate desc) r
>>> > from accounts.ll_18740868 where transactiondescription like
>>> '%HARRODS%'
>>> >  ) RS
>>> > where r=1
>>> > ;
>>> Query ID = hduser_20160731104724_f8e5f426-770a-49fc-a4a5-f0f645c06e8c
>>> Total jobs = 1
>>> Launching Job 1 out of 1
>>> In order to change the average load for a reducer (in bytes):
>>>   set hive.exec.reducers.bytes.per.reducer=
>>> In order to limit the maximum number of reducers:
>>>   set hive.exec.reducers.max=
>>> In order to set a constant number of reducers:
>>>   set mapreduce.job.reduces=
>>> Starting Spark Job = 7727d5df-ccf9-4f98-8563-1cdec2634d99
>>> Query Hive on Spark job[0] stages:
>>> 0
>>> 1
>>> Status: Running (Hive on Spark job[0])
>>> Job Progress Format
>>> CurrentTime StageId_StageAttemptId:
>>> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
>>> [StageCost]
>>> 2016-07-31 10:48:28,726 Stage-0_0: 0/1  Stage-1_0: 0/1
>>> 2016-07-31 10:48:31,750 Stage-0_0: 0/1  Stage-1_0: 0/1
>>> 2016-07-31 10:48:32,758 Stage-0_0: 0(+1)/1  Stage-1_0: 0/1
>>> 2016-07-31 10:48:34,772 Stage-0_0: 1/1 Finished Stage-1_0: 0(+1)/1
>>> 2016-07-31 

Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
It is true that whatever an analytic function does can be done by standard
SQL, with join and sub-queries. But the same routine done by analytic
function is always faster, or at least as fast, when compared to standard
SQL.

I will try to see if I can do analytic functions with Spark FP on Data
Frames. It is essentially replacing the base table with DF and using JAVA
functions instead of SQL ones on top

Also some text based search functions say LIKE in SQL can be replaced with
CONTAINS in FP.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 10:56, ayan guha  wrote:

> The point is, window functions are supposed designed to be faster by doing
> the calculations in one pass, instead of 2 pass in case of max.
>
> DF supports window functions (using sql.Window) so instead of writing sql,
> you can use it as well.
>
> Best
> Ayan
>
> On Sun, Jul 31, 2016 at 7:48 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> yes reserved word issue thanks
>>
>> hive> select *
>> > from (select transactiondate, transactiondescription, debitamount
>> > , rank() over (order by transactiondate desc) r
>> > from accounts.ll_18740868 where transactiondescription like
>> '%HARRODS%'
>> >  ) RS
>> > where r=1
>> > ;
>> Query ID = hduser_20160731104724_f8e5f426-770a-49fc-a4a5-f0f645c06e8c
>> Total jobs = 1
>> Launching Job 1 out of 1
>> In order to change the average load for a reducer (in bytes):
>>   set hive.exec.reducers.bytes.per.reducer=
>> In order to limit the maximum number of reducers:
>>   set hive.exec.reducers.max=
>> In order to set a constant number of reducers:
>>   set mapreduce.job.reduces=
>> Starting Spark Job = 7727d5df-ccf9-4f98-8563-1cdec2634d99
>> Query Hive on Spark job[0] stages:
>> 0
>> 1
>> Status: Running (Hive on Spark job[0])
>> Job Progress Format
>> CurrentTime StageId_StageAttemptId:
>> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
>> [StageCost]
>> 2016-07-31 10:48:28,726 Stage-0_0: 0/1  Stage-1_0: 0/1
>> 2016-07-31 10:48:31,750 Stage-0_0: 0/1  Stage-1_0: 0/1
>> 2016-07-31 10:48:32,758 Stage-0_0: 0(+1)/1  Stage-1_0: 0/1
>> 2016-07-31 10:48:34,772 Stage-0_0: 1/1 Finished Stage-1_0: 0(+1)/1
>> 2016-07-31 10:48:35,780 Stage-0_0: 1/1 Finished Stage-1_0: 1/1 Finished
>> Status: Finished successfully in 10.10 seconds
>> OK
>> 2015-12-15  HARRODS LTD CD 4636 10.95   1
>> Time taken: 46.546 seconds, Fetched: 1 row(s)
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 31 July 2016 at 10:36, ayan guha  wrote:
>>
>>> I think the word "INNER" is reserved in Hive. Please change the alias to
>>> something else.
>>>
>>> Not sure about scala, but essentially it is string replacement.
>>>
>>> On Sun, Jul 31, 2016 at 7:27 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 thanks how about scala?

 BTW the same analytic code fails in Hive itself:(

 hive> select *
 > from (select transactiondate, transactiondescription, debitamount
 > from (select transactiondate, transactiondescription, debitamount
 > , rank() over (order by transactiondate desc) r
 > from ll_18740868 where transactiondescription like '%XYZ%'
 >  ) inner
 > where r=1
 > ;

 FailedPredicateException(identifier,{useSQL11ReservedKeywordsForIdentifier()}?)
 at
 org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:11833)
 at
 org.apache.hadoop.hive.ql.parse.HiveParser.identifier(HiveParser.java:47987)
 at
 org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5520)
 at
 

spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Hello,

I started playing with the Structured Streaming API in spark 2.0 and I am
looking for a way to create streaming Dataset/Dataframe from a rest HTTP
endpoint but I am bit stuck.

"readStream" in SparkSession has a json method but this one is expecting a
path (s3, hdfs, etc) and I want to avoid having to save the data on s3 and
then read again.

What would be the easiest way to hack around it ? Do I need to implement
the Datasource API ?

Are there examples on how to create a DataSource from a REST endpoint ?

Best,
Ayoub


Re: How to filter based on a constant value

2016-07-31 Thread ayan guha
The point is, window functions are supposed designed to be faster by doing
the calculations in one pass, instead of 2 pass in case of max.

DF supports window functions (using sql.Window) so instead of writing sql,
you can use it as well.

Best
Ayan

On Sun, Jul 31, 2016 at 7:48 PM, Mich Talebzadeh 
wrote:

> yes reserved word issue thanks
>
> hive> select *
> > from (select transactiondate, transactiondescription, debitamount
> > , rank() over (order by transactiondate desc) r
> > from accounts.ll_18740868 where transactiondescription like
> '%HARRODS%'
> >  ) RS
> > where r=1
> > ;
> Query ID = hduser_20160731104724_f8e5f426-770a-49fc-a4a5-f0f645c06e8c
> Total jobs = 1
> Launching Job 1 out of 1
> In order to change the average load for a reducer (in bytes):
>   set hive.exec.reducers.bytes.per.reducer=
> In order to limit the maximum number of reducers:
>   set hive.exec.reducers.max=
> In order to set a constant number of reducers:
>   set mapreduce.job.reduces=
> Starting Spark Job = 7727d5df-ccf9-4f98-8563-1cdec2634d99
> Query Hive on Spark job[0] stages:
> 0
> 1
> Status: Running (Hive on Spark job[0])
> Job Progress Format
> CurrentTime StageId_StageAttemptId:
> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
> [StageCost]
> 2016-07-31 10:48:28,726 Stage-0_0: 0/1  Stage-1_0: 0/1
> 2016-07-31 10:48:31,750 Stage-0_0: 0/1  Stage-1_0: 0/1
> 2016-07-31 10:48:32,758 Stage-0_0: 0(+1)/1  Stage-1_0: 0/1
> 2016-07-31 10:48:34,772 Stage-0_0: 1/1 Finished Stage-1_0: 0(+1)/1
> 2016-07-31 10:48:35,780 Stage-0_0: 1/1 Finished Stage-1_0: 1/1 Finished
> Status: Finished successfully in 10.10 seconds
> OK
> 2015-12-15  HARRODS LTD CD 4636 10.95   1
> Time taken: 46.546 seconds, Fetched: 1 row(s)
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 10:36, ayan guha  wrote:
>
>> I think the word "INNER" is reserved in Hive. Please change the alias to
>> something else.
>>
>> Not sure about scala, but essentially it is string replacement.
>>
>> On Sun, Jul 31, 2016 at 7:27 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> thanks how about scala?
>>>
>>> BTW the same analytic code fails in Hive itself:(
>>>
>>> hive> select *
>>> > from (select transactiondate, transactiondescription, debitamount
>>> > from (select transactiondate, transactiondescription, debitamount
>>> > , rank() over (order by transactiondate desc) r
>>> > from ll_18740868 where transactiondescription like '%XYZ%'
>>> >  ) inner
>>> > where r=1
>>> > ;
>>>
>>> FailedPredicateException(identifier,{useSQL11ReservedKeywordsForIdentifier()}?)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:11833)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser.identifier(HiveParser.java:47987)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5520)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5492)
>>> at
>>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
>>> at
>>> 

Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
yes reserved word issue thanks

hive> select *
> from (select transactiondate, transactiondescription, debitamount
> , rank() over (order by transactiondate desc) r
> from accounts.ll_18740868 where transactiondescription like
'%HARRODS%'
>  ) RS
> where r=1
> ;
Query ID = hduser_20160731104724_f8e5f426-770a-49fc-a4a5-f0f645c06e8c
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=
In order to set a constant number of reducers:
  set mapreduce.job.reduces=
Starting Spark Job = 7727d5df-ccf9-4f98-8563-1cdec2634d99
Query Hive on Spark job[0] stages:
0
1
Status: Running (Hive on Spark job[0])
Job Progress Format
CurrentTime StageId_StageAttemptId:
SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
[StageCost]
2016-07-31 10:48:28,726 Stage-0_0: 0/1  Stage-1_0: 0/1
2016-07-31 10:48:31,750 Stage-0_0: 0/1  Stage-1_0: 0/1
2016-07-31 10:48:32,758 Stage-0_0: 0(+1)/1  Stage-1_0: 0/1
2016-07-31 10:48:34,772 Stage-0_0: 1/1 Finished Stage-1_0: 0(+1)/1
2016-07-31 10:48:35,780 Stage-0_0: 1/1 Finished Stage-1_0: 1/1 Finished
Status: Finished successfully in 10.10 seconds
OK
2015-12-15  HARRODS LTD CD 4636 10.95   1
Time taken: 46.546 seconds, Fetched: 1 row(s)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 10:36, ayan guha  wrote:

> I think the word "INNER" is reserved in Hive. Please change the alias to
> something else.
>
> Not sure about scala, but essentially it is string replacement.
>
> On Sun, Jul 31, 2016 at 7:27 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> thanks how about scala?
>>
>> BTW the same analytic code fails in Hive itself:(
>>
>> hive> select *
>> > from (select transactiondate, transactiondescription, debitamount
>> > from (select transactiondate, transactiondescription, debitamount
>> > , rank() over (order by transactiondate desc) r
>> > from ll_18740868 where transactiondescription like '%XYZ%'
>> >  ) inner
>> > where r=1
>> > ;
>>
>> FailedPredicateException(identifier,{useSQL11ReservedKeywordsForIdentifier()}?)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:11833)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.identifier(HiveParser.java:47987)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5520)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5492)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
>> at
>> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
>> at
>> 

Re: How to filter based on a constant value

2016-07-31 Thread ayan guha
I think the word "INNER" is reserved in Hive. Please change the alias to
something else.

Not sure about scala, but essentially it is string replacement.

On Sun, Jul 31, 2016 at 7:27 PM, Mich Talebzadeh 
wrote:

> thanks how about scala?
>
> BTW the same analytic code fails in Hive itself:(
>
> hive> select *
> > from (select transactiondate, transactiondescription, debitamount
> > from (select transactiondate, transactiondescription, debitamount
> > , rank() over (order by transactiondate desc) r
> > from ll_18740868 where transactiondescription like '%XYZ%'
> >  ) inner
> > where r=1
> > ;
>
> FailedPredicateException(identifier,{useSQL11ReservedKeywordsForIdentifier()}?)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:11833)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.identifier(HiveParser.java:47987)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5520)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5492)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1653)
> at
> org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1137)
> at
> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:204)
> at
> org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
> at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:446)
> at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:319)
> at
> org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1255)
> at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1301)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1184)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1172)
> at
> org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
> at
> org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
> at
> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:400)
> at
> org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:778)
> at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:717)
> at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:645)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
> at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
> *FAILED: ParseException line 6:7 

Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
thanks how about scala?

BTW the same analytic code fails in Hive itself:(

hive> select *
> from (select transactiondate, transactiondescription, debitamount
> from (select transactiondate, transactiondescription, debitamount
> , rank() over (order by transactiondate desc) r
> from ll_18740868 where transactiondescription like '%XYZ%'
>  ) inner
> where r=1
> ;
FailedPredicateException(identifier,{useSQL11ReservedKeywordsForIdentifier()}?)
at
org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.identifier(HiveParser_IdentifiersParser.java:11833)
at
org.apache.hadoop.hive.ql.parse.HiveParser.identifier(HiveParser.java:47987)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5520)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
at
org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
at
org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
at
org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
at
org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
at
org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.subQuerySource(HiveParser_FromClauseParser.java:5492)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource0(HiveParser_FromClauseParser.java:3918)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromSource(HiveParser_FromClauseParser.java:3818)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.joinSource(HiveParser_FromClauseParser.java:1909)
at
org.apache.hadoop.hive.ql.parse.HiveParser_FromClauseParser.fromClause(HiveParser_FromClauseParser.java:1546)
at
org.apache.hadoop.hive.ql.parse.HiveParser.fromClause(HiveParser.java:48001)
at
org.apache.hadoop.hive.ql.parse.HiveParser.selectStatement(HiveParser.java:42252)
at
org.apache.hadoop.hive.ql.parse.HiveParser.regularBody(HiveParser.java:42138)
at
org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpressionBody(HiveParser.java:41154)
at
org.apache.hadoop.hive.ql.parse.HiveParser.queryStatementExpression(HiveParser.java:41024)
at
org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1653)
at
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:1137)
at
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:204)
at
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:166)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:446)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:319)
at
org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1255)
at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1301)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1184)
at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1172)
at
org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:233)
at
org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:184)
at
org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:400)
at
org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:778)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:717)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:645)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
*FAILED: ParseException line 6:7 Failed to recognize predicate 'inner'.
Failed rule: 'identifier' in subquery source*


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or 

Re: How to filter based on a constant value

2016-07-31 Thread ayan guha
Hi

This is because Spark does  not provide a way to "bind" variables like
Oracle does.

So you can build the sql string, like below (in python)

val = 'XYZ'
sqlbase = "select . where col = ''".replace(',val)



On Sun, Jul 31, 2016 at 6:25 PM, Mich Talebzadeh 
wrote:

> Thanks Ayan.
>
> This is the one I used
>
> scala> sqltext = """
>  |  select *
>  | from (select transactiondate, transactiondescription, debitamount
>  | , rank() over (order by transactiondate desc) r
>  | from ll_18740868 where transactiondescription like '%XYZ%'
>  |   ) inner
>  |  where r=1
>  |"""
>
> scala> HiveContext.sql(sqltext).show
> +---+--+---+---+
> |transactiondate|transactiondescription|debitamount|  r|
> +---+--+---+---+
> | 2015-12-15|  XYZ LTD CD 4636 |  10.95|  1|
> +---+--+---+---+
>
> The issue I see is that in SQL here I cannot pass HASHTAG as a variable to
> SQL. For example in RDBMS I can do this
>
> 1> declare @pattern varchar(50)
> 2> set @pattern = 'Direct'
> 3> select CHANNEL_DESC from CHANNELS where CHANNEL_DESC like
> '%'||@pattern||'%'
> 4> go
> (1 row affected)
>  CHANNEL_DESC
>  
>  Direct Sales
>
> but not in Hive or Spark SQL
>
> whereas with FP it does it implicitly.
>
> col("CHANNELS").contains(HASHTAG))
>
> Unless there is a way of doing it?
>
> Thanks
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 01:20, ayan guha  wrote:
>
>> select *
>> from (select *,
>>  rank() over (order by transactiondate) r
>>from ll_18740868 where transactiondescription='XYZ'
>>   ) inner
>> where r=1
>>
>> Hi Mitch,
>>
>> If using SQL is fine, you can try the code above. You need to register
>> ll_18740868  as temp table.
>>
>> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I would like to find out when it was the last time I paid a company with
>>> Debit Card
>>>
>>>
>>> This is the way I do it.
>>>
>>> 1) Find the date when I paid last
>>> 2) Find the rest of details from the row(s)
>>>
>>> So
>>>
>>> var HASHTAG = "XYZ"
>>> scala> var maxdate =
>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
>>> maxdate: org.apache.spark.sql.Row = [2015-12-15]
>>>
>>> OK so it was 2015-12-15
>>>
>>>
>>> Now I want to get the rest of the columns. This one works when I hard
>>> code the maxdate!
>>>
>>>
>>> scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
>>> && col("transactiondate") === "2015-12-15").select("transactiondate",
>>> "transactiondescription", "debitamount").show
>>> +---+--+---+
>>> |transactiondate|transactiondescription|debitamount|
>>> +---+--+---+
>>> | 2015-12-15|  XYZ LTD CD 4636 |  10.95|
>>> +---+--+---+
>>>
>>> Now if I want to use the var maxdate in place of "2015-12-15", how would
>>> I do that?
>>>
>>> I tried lit(maxdate) etc but they are all giving me error?
>>>
>>> java.lang.RuntimeException: Unsupported literal type class
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
>>> [2015-12-15]
>>>
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: Structured Streaming Parquet Sink

2016-07-31 Thread Tathagata Das
Yes, files do not support complete mode output yet. We are working on that,
and should be available in Spark 2.1.

In the meantime, you can use aggregation with memory sink (i.e.
format("memory")) to store in a in-memory table, which then can be
periodically written to a parquet table explicitly. Note that memory sink
collected everything in the driver, so use it with caution.

TD

On Sat, Jul 30, 2016 at 11:58 PM, Jacek Laskowski  wrote:

> Hi Arun,
>
> Regarding parquet and complete output mode:
>
> A relevant piece of the code to think about:
>
> if (outputMode != OutputMode.Append) {
>   throw new IllegalArgumentException(
> s"Data source $className does not support $outputMode output
> mode")
> }
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L267-L270
>
> It says that only Append output mode is supported. The message
> could've been more precise in this case (but since it's an alpha API
> I'd not recommend changing it anyway).
>
> Regarding aggregations for parquet (and other Append-output sinks)
>
> Here is a relevant piece of the code:
>
> outputMode match {
>   case InternalOutputModes.Append if aggregates.nonEmpty =>
> throwError(
>   s"$outputMode output mode not supported when there are
> streaming aggregations on " +
> s"streaming DataFrames/DataSets")(plan)
>
>
> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L56-L60
>
> It says that for append output mode you can have no aggregates in a
> streaming pipeline.
>
> To me it says that parquet can be append output mode only with no
> aggregates.
>
> Kudos for letting me know about it!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Jul 31, 2016 at 1:19 AM, Arun Patel 
> wrote:
> > Thanks for the response. However, I am not able to use any output mode.
> In
> > case of Parquet sink, there should not be any aggregations?
> >
> > scala> val query =
> >
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start()
> > java.lang.IllegalArgumentException: Data source parquet does not support
> > Complete output mode
> >   at
> >
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:267)
> >   at
> >
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:291)
> >   ... 54 elided
> >
> > scala> val query =
> >
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("append").start()
> > org.apache.spark.sql.AnalysisException: Append output mode not supported
> > when there are streaming aggregations on streaming DataFrames/DataSets;
> >   at
> >
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
> >   at
> >
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:60)
> >   at
> >
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236)
> >   at
> >
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
> >   ... 54 elided
> >
> > scala> val query =
> >
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start()
> > java.lang.IllegalArgumentException: Data source parquet does not support
> > Complete output mode
> >   at
> >
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:267)
> >   at
> >
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:291)
> >   ... 54 elided
> >
> >
> > On Sat, Jul 30, 2016 at 5:59 PM, Tathagata Das 
> wrote:
> >>
> >> Correction, the two options are.
> >>
> >> - writeStream.format("parquet").option("path", "...").start()
> >> - writestream.parquet("...").start()
> >>
> >> There no start with param.
> >>
> >>
> >> On Jul 30, 2016 11:22 AM, "Jacek Laskowski"  wrote:
> >>>
> >>> Hi Arun,
> >>>
> >>> > As per documentation, parquet is the only available file sink.
> >>>
> >>> The following sinks are currently available in Spark:
> >>>
> >>> * ConsoleSink for console format.
> >>> * FileStreamSink for parquet format.
> >>> * ForeachSink used in foreach operator.
> >>> * MemorySink for memory format.
> >>>
> >>> You can create your own streaming format implementing
> StreamSinkProvider.
> >>>
> >>> > I am getting an error like 'path' is not 

Re: Visualization of data analysed using spark

2016-07-31 Thread Sivakumaran S
Hi Tony,

If your requirement is browser based plotting (real time or other wise), you 
can load the data and display it in a browser using D3. Since D3 has very low 
level plotting routines, you can look at C3 ( provided by www.pubnub.com) or 
Rickshaw (https://github.com/shutterstock/rickshaw 
) both of which provide a higher 
level abstraction for plotting.  

HTH,

Regards,

Sivakumaran 

> On 31-Jul-2016, at 7:35 AM, Gourav Sengupta  wrote:
> 
> If you are using  Python, please try using Bokeh and its related stack. Most 
> of the people in this forum including guys at data bricks have not tried that 
> stack from Anaconda, its worth a try when you are visualizing data in big 
> data stack.
> 
> 
> Regards,
> Gourav
> 
> On Sat, Jul 30, 2016 at 10:25 PM, Rerngvit Yanggratoke 
> > 
> wrote:
> Since you already have an existing application (not starting from scratch), 
> the simplest way to visualize would be to export the data to a file (e.g., a 
> CSV file) and visualise using other tools, e.g., Excel, RStudio, Matlab, 
> Jupiter, Zeppelin, Tableu, Elastic Stack.
> The choice depends on your background and preferences of the technology. Note 
> that if you are dealing with a large dataset, you generally first should 
> apply sampling to the data. A good mechanism to sampling depends on your 
> application domain.
> 
> - Rerngvit
> > On 30 Jul 2016, at 21:45, Tony Lane  > > wrote:
> >
> > I am developing my analysis application by using spark (in eclipse as the 
> > IDE)
> >
> > what is a good way to visualize the data, taking into consideration i have 
> > multiple files which make up my spark application.
> >
> > I have seen some notebook demo's but not sure how to use my application 
> > with such notebooks.
> >
> > thoughts/ suggestions/ experiences -- please share
> >
> > -Tony
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



[Spark 2.0] Why MutableInt cannot be cast to MutableLong?

2016-07-31 Thread Chanh Le
Hi everyone,
Why MutableInt cannot be cast to MutableLong?
It’s really weird and seems Spark 2.0 has a lot of error with parquet about 
format.

org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to 
org.apache.spark.sql.catalyst.expressions.MutableL ong

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block 0 in file 
file:/data/etl-report/parquet/AD_COOKIE_REPORT/time=2016-07-
25-16/network_id=31713/part-r-0-9adbef89-f2f4-4836-a50c-a2e7b381d558.snappy.parquet
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at 
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.hasNext(SqlNewHadoopRDD.scala:194)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:88)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.MutableInt cannot be cast to 
org.apache.spark.sql.catalyst.expressions.MutableL
ong
at 
org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setLong(SpecificMutableRow.scala:295)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystRowConverter$RowUpdater.setLong(CatalystRowConverter.scala:161)
at 
org.apache.spark.sql.execution.datasources.parquet.CatalystPrimitiveConverter.addLong(CatalystRowConverter.scala:85)
at 
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:269)
at 
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:365)
at 
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 20 more

Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
Thanks Ayan.

This is the one I used

scala> sqltext = """
 |  select *
 | from (select transactiondate, transactiondescription, debitamount
 | , rank() over (order by transactiondate desc) r
 | from ll_18740868 where transactiondescription like '%XYZ%'
 |   ) inner
 |  where r=1
 |"""

scala> HiveContext.sql(sqltext).show
+---+--+---+---+
|transactiondate|transactiondescription|debitamount|  r|
+---+--+---+---+
| 2015-12-15|  XYZ LTD CD 4636 |  10.95|  1|
+---+--+---+---+

The issue I see is that in SQL here I cannot pass HASHTAG as a variable to
SQL. For example in RDBMS I can do this

1> declare @pattern varchar(50)
2> set @pattern = 'Direct'
3> select CHANNEL_DESC from CHANNELS where CHANNEL_DESC like
'%'||@pattern||'%'
4> go
(1 row affected)
 CHANNEL_DESC
 
 Direct Sales

but not in Hive or Spark SQL

whereas with FP it does it implicitly.

col("CHANNELS").contains(HASHTAG))

Unless there is a way of doing it?

Thanks














Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 01:20, ayan guha  wrote:

> select *
> from (select *,
>  rank() over (order by transactiondate) r
>from ll_18740868 where transactiondescription='XYZ'
>   ) inner
> where r=1
>
> Hi Mitch,
>
> If using SQL is fine, you can try the code above. You need to register
> ll_18740868  as temp table.
>
> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>>
>> Hi,
>>
>> I would like to find out when it was the last time I paid a company with
>> Debit Card
>>
>>
>> This is the way I do it.
>>
>> 1) Find the date when I paid last
>> 2) Find the rest of details from the row(s)
>>
>> So
>>
>> var HASHTAG = "XYZ"
>> scala> var maxdate =
>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
>> maxdate: org.apache.spark.sql.Row = [2015-12-15]
>>
>> OK so it was 2015-12-15
>>
>>
>> Now I want to get the rest of the columns. This one works when I hard
>> code the maxdate!
>>
>>
>> scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
>> && col("transactiondate") === "2015-12-15").select("transactiondate",
>> "transactiondescription", "debitamount").show
>> +---+--+---+
>> |transactiondate|transactiondescription|debitamount|
>> +---+--+---+
>> | 2015-12-15|  XYZ LTD CD 4636 |  10.95|
>> +---+--+---+
>>
>> Now if I want to use the var maxdate in place of "2015-12-15", how would
>> I do that?
>>
>> I tried lit(maxdate) etc but they are all giving me error?
>>
>> java.lang.RuntimeException: Unsupported literal type class
>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
>> [2015-12-15]
>>
>>
>> Thanks
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: sql to spark scala rdd

2016-07-31 Thread Jacek Laskowski
Hi,

What's the result type of sliding(2,1)?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 31, 2016 at 9:23 AM, sri hari kali charan Tummala
 wrote:
> tried this no luck, wht is non-empty iterator here ?
>
> OP:-
> (-987,non-empty iterator)
> (-987,non-empty iterator)
> (-987,non-empty iterator)
> (-987,non-empty iterator)
> (-987,non-empty iterator)
>
>
> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>   .map(x => x._2.split("\\~"))
>   .map(x => (x(0),x(2)))
> .map { case (key,value) => (key,value.toArray.toSeq.sliding(2,1).map(x
> => x.sum/x.size))}.foreach(println)
>
>
> On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala
>  wrote:
>>
>> Hi All,
>>
>> I managed to write using sliding function but can it get key as well in my
>> output ?
>>
>> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>>   .map(x => x._2.split("\\~"))
>>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
>> (x,x.size)).foreach(println)
>>
>>
>> at the moment my output:-
>>
>> 75.0
>> -25.0
>> 50.0
>> -50.0
>> -100.0
>>
>> I want with key how to get moving average output based on key ?
>>
>>
>> 987,75.0
>> 987,-25
>> 987,50.0
>>
>> Thanks
>> Sri
>>
>>
>>
>>
>>
>>
>> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala
>>  wrote:
>>>
>>> for knowledge just wondering how to write it up in scala or spark RDD.
>>>
>>> Thanks
>>> Sri
>>>
>>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
>>> wrote:

 Why?

 Pozdrawiam,
 Jacek Laskowski
 
 https://medium.com/@jaceklaskowski/
 Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
 Follow me at https://twitter.com/jaceklaskowski


 On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
  wrote:
 > Hi All,
 >
 > I managed to write business requirement in spark-sql and hive I am
 > still
 > learning scala how this below sql be written using spark RDD not spark
 > data
 > frames.
 >
 > SELECT DATE,balance,
 > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
 > CURRENT ROW) daily_balance
 > FROM  table
 >
 >
 >
 >
 >
 > --
 > View this message in context:
 > http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
 > Sent from the Apache Spark User List mailing list archive at
 > Nabble.com.
 >
 > -
 > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
 >
>>>
>>>
>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
Thanks all

scala> var maxdate =
ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0).getDate(0)
maxdate: java.sql.Date = 2015-12-15
scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
&& col("transactiondate") === maxdate).select("transactiondate",
"transactiondescription", "debitamount").show
+---+--+---+
|transactiondate|transactiondescription|debitamount|
+---+--+---+
| 2015-12-15|  XYZ LTD CD 4636 |  10.95|
+---+--+---+



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 08:27, Mich Talebzadeh  wrote:

> thanks Nicholas got it
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 07:49, Nicholas Hakobian <
> nicholas.hakob...@rallyhealth.com> wrote:
>
>> From the online docs:
>> https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/Row.html#apply(int)
>>
>> java.lang.Object apply(int i)
>> Returns the value at position i. If the value is null, null is returned.
>> The following is a mapping between Spark SQL types and return types:
>>
>> So its returning the content of the first element in the row, in this
>> case the Array (of length 1) of Date types.
>>
>>
>> Nicholas Szandor Hakobian
>> Data Scientist
>> Rally Health
>> nicholas.hakob...@rallyhealth.com
>> M: 510-295-7113
>>
>>
>> On Sat, Jul 30, 2016 at 11:41 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> thanks gents.
>>>
>>> I am trying to understand this better.
>>>
>>> As I understand a DataFrame is basically an equivalent table in
>>> relational term.
>>>
>>> so
>>>
>>> scala> var maxdate =
>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate"))
>>> maxdate: org.apache.spark.sql.DataFrame = [max(transactiondate): date]
>>>
>>> So I find the max(transactiondate) for the filter I have applied.  In
>>> sql term --> select max(transactiondate) from ll_18740868 where
>>> transactiondescription like "%HASHTAG%"
>>>
>>> Now I want to store it in a single variable and get it worked out
>>>
>>> scala> var maxdate =
>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect
>>> maxdate: Array[org.apache.spark.sql.Row] = Array([2015-12-15])
>>>
>>> Now I have the value stored in a row. I get it as follows. It is the
>>> first column of the row (actually the only column) and in date format
>>>
>>> scala> var maxdate =
>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.
>>> apply(0).getDate(0)
>>> maxdate: java.sql.Date = 2015-12-15
>>>
>>> what is the role of apply(0) here?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 31 July 2016 at 03:28, Xinh Huynh  wrote:
>>>
 Hi Mitch,

 I think you were missing a step:
 [your result] maxdate: org.apache.spark.sql.Row = [2015-12-15]
 Since maxdate is of type Row, you would want to extract the first
 column of the Row with:

 >> val maxdateStr = maxdate.getString(0)

 assuming the column type is String.
 API doc 

Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
thanks Nicholas got it



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 07:49, Nicholas Hakobian <
nicholas.hakob...@rallyhealth.com> wrote:

> From the online docs:
> https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/Row.html#apply(int)
>
> java.lang.Object apply(int i)
> Returns the value at position i. If the value is null, null is returned.
> The following is a mapping between Spark SQL types and return types:
>
> So its returning the content of the first element in the row, in this case
> the Array (of length 1) of Date types.
>
>
> Nicholas Szandor Hakobian
> Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
> M: 510-295-7113
>
>
> On Sat, Jul 30, 2016 at 11:41 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> thanks gents.
>>
>> I am trying to understand this better.
>>
>> As I understand a DataFrame is basically an equivalent table in
>> relational term.
>>
>> so
>>
>> scala> var maxdate =
>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate"))
>> maxdate: org.apache.spark.sql.DataFrame = [max(transactiondate): date]
>>
>> So I find the max(transactiondate) for the filter I have applied.  In sql
>> term --> select max(transactiondate) from ll_18740868 where
>> transactiondescription like "%HASHTAG%"
>>
>> Now I want to store it in a single variable and get it worked out
>>
>> scala> var maxdate =
>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect
>> maxdate: Array[org.apache.spark.sql.Row] = Array([2015-12-15])
>>
>> Now I have the value stored in a row. I get it as follows. It is the
>> first column of the row (actually the only column) and in date format
>>
>> scala> var maxdate =
>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.
>> apply(0).getDate(0)
>> maxdate: java.sql.Date = 2015-12-15
>>
>> what is the role of apply(0) here?
>>
>> Thanks
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 31 July 2016 at 03:28, Xinh Huynh  wrote:
>>
>>> Hi Mitch,
>>>
>>> I think you were missing a step:
>>> [your result] maxdate: org.apache.spark.sql.Row = [2015-12-15]
>>> Since maxdate is of type Row, you would want to extract the first column
>>> of the Row with:
>>>
>>> >> val maxdateStr = maxdate.getString(0)
>>>
>>> assuming the column type is String.
>>> API doc is here:
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
>>>
>>> Then you can do the query:
>>>
>>> >> col("transactiondate") === maxdateStr
>>>
>>> Xinh
>>>
>>> On Sat, Jul 30, 2016 at 5:20 PM, ayan guha  wrote:
>>>
 select *
 from (select *,
  rank() over (order by transactiondate) r
from ll_18740868 where transactiondescription='XYZ'
   ) inner
 where r=1

 Hi Mitch,

 If using SQL is fine, you can try the code above. You need to register
 ll_18740868  as temp table.

 On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

>
> Hi,
>
> I would like to find out when it was the last time I paid a company
> with Debit Card
>
>
> This is the way I do it.
>
> 1) Find the date when I paid last
> 2) Find the rest of details from the row(s)
>
> So
>
> var HASHTAG = "XYZ"
> scala> var maxdate =
> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
> maxdate: org.apache.spark.sql.Row = [2015-12-15]
>
> OK so it was 2015-12-15
>
>
> Now I want to get the rest of the columns. This one works when I hard
> code the maxdate!
>
>
> scala> 

Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
tried this no luck, wht is non-empty iterator here ?

OP:-
(-987,non-empty iterator)
(-987,non-empty iterator)
(-987,non-empty iterator)
(-987,non-empty iterator)
(-987,non-empty iterator)


sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => (x(0),x(2)))
.map { case (key,value) =>
(key,value.toArray.toSeq.sliding(2,1).map(x =>
x.sum/x.size))}.foreach(println)


On Sun, Jul 31, 2016 at 12:03 AM, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> I managed to write using sliding function but can it get key as well in my
> output ?
>
> sc.textFile(file).keyBy(x => x.split("\\~") (0))
>   .map(x => x._2.split("\\~"))
>   .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x => 
> (x,x.size)).foreach(println)
>
>
> at the moment my output:-
>
> 75.0
> -25.0
> 50.0
> -50.0
> -100.0
>
> I want with key how to get moving average output based on key ?
>
>
> 987,75.0
> 987,-25
> 987,50.0
>
> Thanks
> Sri
>
>
>
>
>
>
> On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> for knowledge just wondering how to write it up in scala or spark RDD.
>>
>> Thanks
>> Sri
>>
>> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski 
>> wrote:
>>
>>> Why?
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>>>  wrote:
>>> > Hi All,
>>> >
>>> > I managed to write business requirement in spark-sql and hive I am
>>> still
>>> > learning scala how this below sql be written using spark RDD not spark
>>> data
>>> > frames.
>>> >
>>> > SELECT DATE,balance,
>>> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
>>> > CURRENT ROW) daily_balance
>>> > FROM  table
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >
>>>
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


-- 
Thanks & Regards
Sri Tummala


Re: sql to spark scala rdd

2016-07-31 Thread sri hari kali charan Tummala
Hi All,

I managed to write using sliding function but can it get key as well in my
output ?

sc.textFile(file).keyBy(x => x.split("\\~") (0))
  .map(x => x._2.split("\\~"))
  .map(x => (x(2).toDouble)).toArray().sliding(2,1).map(x =>
(x,x.size)).foreach(println)


at the moment my output:-

75.0
-25.0
50.0
-50.0
-100.0

I want with key how to get moving average output based on key ?


987,75.0
987,-25
987,50.0

Thanks
Sri






On Sat, Jul 30, 2016 at 11:40 AM, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> for knowledge just wondering how to write it up in scala or spark RDD.
>
> Thanks
> Sri
>
> On Sat, Jul 30, 2016 at 11:24 AM, Jacek Laskowski  wrote:
>
>> Why?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Jul 30, 2016 at 4:42 AM, kali.tumm...@gmail.com
>>  wrote:
>> > Hi All,
>> >
>> > I managed to write business requirement in spark-sql and hive I am still
>> > learning scala how this below sql be written using spark RDD not spark
>> data
>> > frames.
>> >
>> > SELECT DATE,balance,
>> > SUM(balance) OVER (ORDER BY DATE ROWS BETWEEN UNBOUNDED PRECEDING AND
>> > CURRENT ROW) daily_balance
>> > FROM  table
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/sql-to-spark-scala-rdd-tp27433.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


-- 
Thanks & Regards
Sri Tummala


Re: Structured Streaming Parquet Sink

2016-07-31 Thread Jacek Laskowski
Hi Arun,

Regarding parquet and complete output mode:

A relevant piece of the code to think about:

if (outputMode != OutputMode.Append) {
  throw new IllegalArgumentException(
s"Data source $className does not support $outputMode output mode")
}

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L267-L270

It says that only Append output mode is supported. The message
could've been more precise in this case (but since it's an alpha API
I'd not recommend changing it anyway).

Regarding aggregations for parquet (and other Append-output sinks)

Here is a relevant piece of the code:

outputMode match {
  case InternalOutputModes.Append if aggregates.nonEmpty =>
throwError(
  s"$outputMode output mode not supported when there are
streaming aggregations on " +
s"streaming DataFrames/DataSets")(plan)

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L56-L60

It says that for append output mode you can have no aggregates in a
streaming pipeline.

To me it says that parquet can be append output mode only with no aggregates.

Kudos for letting me know about it!

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Jul 31, 2016 at 1:19 AM, Arun Patel  wrote:
> Thanks for the response. However, I am not able to use any output mode.  In
> case of Parquet sink, there should not be any aggregations?
>
> scala> val query =
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start()
> java.lang.IllegalArgumentException: Data source parquet does not support
> Complete output mode
>   at
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:267)
>   at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:291)
>   ... 54 elided
>
> scala> val query =
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("append").start()
> org.apache.spark.sql.AnalysisException: Append output mode not supported
> when there are streaming aggregations on streaming DataFrames/DataSets;
>   at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
>   at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:60)
>   at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236)
>   at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
>   ... 54 elided
>
> scala> val query =
> streamingCountsDF.writeStream.format("parquet").option("path","parq").option("checkpointLocation","chkpnt").outputMode("complete").start()
> java.lang.IllegalArgumentException: Data source parquet does not support
> Complete output mode
>   at
> org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:267)
>   at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:291)
>   ... 54 elided
>
>
> On Sat, Jul 30, 2016 at 5:59 PM, Tathagata Das  wrote:
>>
>> Correction, the two options are.
>>
>> - writeStream.format("parquet").option("path", "...").start()
>> - writestream.parquet("...").start()
>>
>> There no start with param.
>>
>>
>> On Jul 30, 2016 11:22 AM, "Jacek Laskowski"  wrote:
>>>
>>> Hi Arun,
>>>
>>> > As per documentation, parquet is the only available file sink.
>>>
>>> The following sinks are currently available in Spark:
>>>
>>> * ConsoleSink for console format.
>>> * FileStreamSink for parquet format.
>>> * ForeachSink used in foreach operator.
>>> * MemorySink for memory format.
>>>
>>> You can create your own streaming format implementing StreamSinkProvider.
>>>
>>> > I am getting an error like 'path' is not specified.
>>> > Any idea how to write this to parquet file?
>>>
>>> There are two ways to specify "path":
>>>
>>> 1. Using option method
>>> 2. start(path: String): StreamingQuery
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> 
>>> https://medium.com/@jaceklaskowski/
>>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>>
>>> On Sat, Jul 30, 2016 at 2:50 PM, Arun Patel 
>>> wrote:
>>> > I am trying out Structured streaming parquet sink.  As per
>>> > documentation,
>>> > parquet is the only available file sink.
>>> >
>>> > I am getting an error like 'path' is not specified.
>>> >
>>> > scala> val 

Re: How to filter based on a constant value

2016-07-31 Thread Nicholas Hakobian
>From the online docs:
https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/Row.html#apply(int)

java.lang.Object apply(int i)
Returns the value at position i. If the value is null, null is returned.
The following is a mapping between Spark SQL types and return types:

So its returning the content of the first element in the row, in this case
the Array (of length 1) of Date types.


Nicholas Szandor Hakobian
Data Scientist
Rally Health
nicholas.hakob...@rallyhealth.com
M: 510-295-7113


On Sat, Jul 30, 2016 at 11:41 PM, Mich Talebzadeh  wrote:

> thanks gents.
>
> I am trying to understand this better.
>
> As I understand a DataFrame is basically an equivalent table in relational
> term.
>
> so
>
> scala> var maxdate =
> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate"))
> maxdate: org.apache.spark.sql.DataFrame = [max(transactiondate): date]
>
> So I find the max(transactiondate) for the filter I have applied.  In sql
> term --> select max(transactiondate) from ll_18740868 where
> transactiondescription like "%HASHTAG%"
>
> Now I want to store it in a single variable and get it worked out
>
> scala> var maxdate =
> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect
> maxdate: Array[org.apache.spark.sql.Row] = Array([2015-12-15])
>
> Now I have the value stored in a row. I get it as follows. It is the
> first column of the row (actually the only column) and in date format
>
> scala> var maxdate =
> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.
> apply(0).getDate(0)
> maxdate: java.sql.Date = 2015-12-15
>
> what is the role of apply(0) here?
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 31 July 2016 at 03:28, Xinh Huynh  wrote:
>
>> Hi Mitch,
>>
>> I think you were missing a step:
>> [your result] maxdate: org.apache.spark.sql.Row = [2015-12-15]
>> Since maxdate is of type Row, you would want to extract the first column
>> of the Row with:
>>
>> >> val maxdateStr = maxdate.getString(0)
>>
>> assuming the column type is String.
>> API doc is here:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
>>
>> Then you can do the query:
>>
>> >> col("transactiondate") === maxdateStr
>>
>> Xinh
>>
>> On Sat, Jul 30, 2016 at 5:20 PM, ayan guha  wrote:
>>
>>> select *
>>> from (select *,
>>>  rank() over (order by transactiondate) r
>>>from ll_18740868 where transactiondescription='XYZ'
>>>   ) inner
>>> where r=1
>>>
>>> Hi Mitch,
>>>
>>> If using SQL is fine, you can try the code above. You need to register
>>> ll_18740868  as temp table.
>>>
>>> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>

 Hi,

 I would like to find out when it was the last time I paid a company
 with Debit Card


 This is the way I do it.

 1) Find the date when I paid last
 2) Find the rest of details from the row(s)

 So

 var HASHTAG = "XYZ"
 scala> var maxdate =
 ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
 maxdate: org.apache.spark.sql.Row = [2015-12-15]

 OK so it was 2015-12-15


 Now I want to get the rest of the columns. This one works when I hard
 code the maxdate!


 scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
 && col("transactiondate") === "2015-12-15").select("transactiondate",
 "transactiondescription", "debitamount").show
 +---+--+---+
 |transactiondate|transactiondescription|debitamount|
 +---+--+---+
 | 2015-12-15|  XYZ LTD CD 4636 |  10.95|
 +---+--+---+

 Now if I want to use the var maxdate in place of "2015-12-15", how
 would I do that?

 I tried lit(maxdate) etc but they are all giving me error?

 java.lang.RuntimeException: Unsupported literal type class
 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 [2015-12-15]


 Thanks

>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>

Re: How to filter based on a constant value

2016-07-31 Thread Mich Talebzadeh
thanks gents.

I am trying to understand this better.

As I understand a DataFrame is basically an equivalent table in relational
term.

so

scala> var maxdate =
ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate"))
maxdate: org.apache.spark.sql.DataFrame = [max(transactiondate): date]

So I find the max(transactiondate) for the filter I have applied.  In sql
term --> select max(transactiondate) from ll_18740868 where
transactiondescription like "%HASHTAG%"

Now I want to store it in a single variable and get it worked out

scala> var maxdate =
ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect
maxdate: Array[org.apache.spark.sql.Row] = Array([2015-12-15])

Now I have the value stored in a row. I get it as follows. It is the first
column of the row (actually the only column) and in date format

scala> var maxdate =
ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.
apply(0).getDate(0)
maxdate: java.sql.Date = 2015-12-15

what is the role of apply(0) here?

Thanks





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 31 July 2016 at 03:28, Xinh Huynh  wrote:

> Hi Mitch,
>
> I think you were missing a step:
> [your result] maxdate: org.apache.spark.sql.Row = [2015-12-15]
> Since maxdate is of type Row, you would want to extract the first column
> of the Row with:
>
> >> val maxdateStr = maxdate.getString(0)
>
> assuming the column type is String.
> API doc is here:
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row
>
> Then you can do the query:
>
> >> col("transactiondate") === maxdateStr
>
> Xinh
>
> On Sat, Jul 30, 2016 at 5:20 PM, ayan guha  wrote:
>
>> select *
>> from (select *,
>>  rank() over (order by transactiondate) r
>>from ll_18740868 where transactiondescription='XYZ'
>>   ) inner
>> where r=1
>>
>> Hi Mitch,
>>
>> If using SQL is fine, you can try the code above. You need to register
>> ll_18740868  as temp table.
>>
>> On Sun, Jul 31, 2016 at 6:49 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>>
>>> Hi,
>>>
>>> I would like to find out when it was the last time I paid a company with
>>> Debit Card
>>>
>>>
>>> This is the way I do it.
>>>
>>> 1) Find the date when I paid last
>>> 2) Find the rest of details from the row(s)
>>>
>>> So
>>>
>>> var HASHTAG = "XYZ"
>>> scala> var maxdate =
>>> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)).agg(max("transactiondate")).collect.apply(0)
>>> maxdate: org.apache.spark.sql.Row = [2015-12-15]
>>>
>>> OK so it was 2015-12-15
>>>
>>>
>>> Now I want to get the rest of the columns. This one works when I hard
>>> code the maxdate!
>>>
>>>
>>> scala> ll_18740868.filter(col("transactiondescription").contains(HASHTAG)
>>> && col("transactiondate") === "2015-12-15").select("transactiondate",
>>> "transactiondescription", "debitamount").show
>>> +---+--+---+
>>> |transactiondate|transactiondescription|debitamount|
>>> +---+--+---+
>>> | 2015-12-15|  XYZ LTD CD 4636 |  10.95|
>>> +---+--+---+
>>>
>>> Now if I want to use the var maxdate in place of "2015-12-15", how would
>>> I do that?
>>>
>>> I tried lit(maxdate) etc but they are all giving me error?
>>>
>>> java.lang.RuntimeException: Unsupported literal type class
>>> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
>>> [2015-12-15]
>>>
>>>
>>> Thanks
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Visualization of data analysed using spark

2016-07-31 Thread Gourav Sengupta
If you are using  Python, please try using Bokeh and its related stack.
Most of the people in this forum including guys at data bricks have not
tried that stack from Anaconda, its worth a try when you are visualizing
data in big data stack.


Regards,
Gourav

On Sat, Jul 30, 2016 at 10:25 PM, Rerngvit Yanggratoke <
rerngvit.yanggrat...@gmail.com> wrote:

> Since you already have an existing application (not starting from
> scratch), the simplest way to visualize would be to export the data to a
> file (e.g., a CSV file) and visualise using other tools, e.g., Excel,
> RStudio, Matlab, Jupiter, Zeppelin, Tableu, Elastic Stack.
> The choice depends on your background and preferences of the technology.
> Note that if you are dealing with a large dataset, you generally first
> should apply sampling to the data. A good mechanism to sampling depends on
> your application domain.
>
> - Rerngvit
> > On 30 Jul 2016, at 21:45, Tony Lane  wrote:
> >
> > I am developing my analysis application by using spark (in eclipse as
> the IDE)
> >
> > what is a good way to visualize the data, taking into consideration i
> have multiple files which make up my spark application.
> >
> > I have seen some notebook demo's but not sure how to use my application
> with such notebooks.
> >
> > thoughts/ suggestions/ experiences -- please share
> >
> > -Tony
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>