Window function / streaming

2017-07-04 Thread Julien CHAMP
Hi there !

Let me explain my problem to see if you have a good solution to help me :)

Let's imagine that I have all my data in a DB or a file, that I load in a
dataframe DF with the following columns :
*id | timestamp(ms) | value*
A | 100 |  100
A | 110 |  50
B | 100 |  100
B | 110 |  50
B | 120 |  200
B | 250 |  500
C | 100 |  200
C | 110 |  500

The timestamp is a *long value*, so as to be able to express date in ms
from -01-01 to today !

I want to compute operations such as min, max, average on the *value column*,
for a given window function, and grouped by id ( Bonus :  if possible for
only some timestamps... )

For example if I have 3 tuples :

id | timestamp(ms) | value
B | 100 |  100
B | 110 |  50
B | 120 |  200
B | 250 |  500

I would like to be able to compute the min value for windows of time = 20.
This would result in such a DF :

id | timestamp(ms) | value | min___value
B | 100 |  100 | 100
B | 110 |  50  | 50
B | 120 |  200 | 50
B | 250 |  500 | 500

This seems the perfect use case for window function in spark  ( cf :
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
 )
I can use :

Window.orderBy("timestamp").partitionBy("id").rangeBetween(-20,0)
df.withColumn("min___value", min(df.col("value")).over(tw))

This leads to the perfect answer !

However, there is a big bug with window functions as reported here (
https://issues.apache.org/jira/browse/SPARK-19451 ) when working with Long
values !!! So I can't use this

So my question is ( of course ) how can I resolve my problem ?
If I use spark streaming I will face the same issue ?

I'll be glad to discuss this problem with you, feel free to answer :)

Regards,

Julien




-- 


Julien CHAMP — Data Scientist


*Web : **www.tellmeplus.com*  — *Email :
**jch...@tellmeplus.com
*

*Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*


TellMePlus S.A — Predictive Objects

*Paris* : 7 rue des Pommerots, 78400 Chatou
*Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière

-- 

Ce message peut contenir des informations confidentielles ou couvertes par 
le secret professionnel, à l’intention de son destinataire. Si vous n’en 
êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer 
toute copie.
This email may contain confidential and/or privileged information for the 
intended recipient. If you are not the intended recipient, please contact 
the sender and delete all copies.


-- 
 


Re: Analysis Exception after join

2017-07-04 Thread Bernard Jesop
It seems to be because of this issues:
https://issues.apache.org/jira/browse/SPARK-10925

I added a checkpoint, as suggested, to break the lineage and it worked.

Best regards,

2017-07-04 17:26 GMT+02:00 Bernard Jesop :

> Thank Didac,
>
> My bad, actually this code is incomplete, it should have been : - dfAgg =
> df.groupBy("S_ID").agg(...).
>
> I want to access the aggregated values (of dfAgg) for each row of 'df',
> that is why I do a left outer join.
>
>
> Also, regarding the second parameter, I am using this signature of join :
> (Dataset, Seq[String], String) => Dataset.
>
> It is a sequence of the column names to use as keys for the join, some
> kind of syntactic sugar for (df1("key1") === df2("key1") && df1("key2") ===
> df2("key2") && ),
> except it will not duplicate the columns used as keys.
>
> 2017-07-03 12:39 GMT+02:00 Didac Gil :
>
>> With the left join, you are joining two tables.
>>
>> In your case, df is the left table, dfAgg is the right table.
>> The second parameter should be the joining condition, right?
>> For instance
>>
>>  dfRes = df.join(dfAgg, $”userName”===$”name", "left_outer”)
>>
>> having a field in df called userName, and another in dfAgg called “name”
>>
>> However, what’s the kind of query you want to make? dfAgg is already the
>> df table that has been grouped by S_ID.
>>
>> I guess that you are looking for something more like the following example
>> dfAgg = df.groupBy("S_ID”)
>>.agg(org.apache.spark.sql.functions.count(*“userName"*).as(
>> *“usersCount**”*),
>>  .agg(org.apache.spark.sql.functions.collect_set(“city")
>> .as("ListofCities”)),
>>  .agg(*org.apache.spark.sql.functions.max(“age").as(“oldest”))*
>> )
>>
>> On 3 Jul 2017, at 11:55, Bernard Jesop  wrote:
>>
>> Hello, I don't understand my error message.
>>
>> Basically, all I am doing is :
>> - dfAgg = df.groupBy("S_ID")
>> - dfRes = df.join(dfAgg, Seq("S_ID"), "left_outer")
>>
>> However I get this AnalysisException: "
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> resolved attribute(s) S_ID#1903L missing from Dummy_ID#740,sex#37L,PERSONAL_
>> STATUS#726L,W_DEP_CODE#736,W_SIZE#739L,
>> POSTAL_CODE#735,COUNTRY_CODE#730,
>> ID#724L,Dummy_ID_1#741,DEP_CODE#729,HOUSEHOLD_TYPE#733L,
>> HOUSEHOLD_SIZE#734L,AGE#727L,W_ID#738L,H_ID#732L,AGE_TYPE#728,
>> S_ID#57L,NATIONALITY#731
>> in operator !Project [ID#724L, sex#37L, PERSON\
>>  AL_STATUS#726L, AGE#727L, AGE_TYPE#728, DEP_CODE#729, COUNTRY_CODE#730,
>> NATIONALITY#731 AS Nationality#77, H_ID#732L, HOUSEHOLD_TYPE#733L,
>> HOUSEHOLD_SIZE#734L, POSTAL_CODE#735, W_DEP_CODE#736, S_ID#1903L,
>> W_ID#738L, W_SIZE#739L, Dummy_ID#740, Dummy_ID_1#741];; "
>>
>> What I don't understand is it says S_ID#1903L is missing
>> but everything seems fine on the Logical Plan.
>> +- Join LeftOuter, (S_ID#57L = S_ID#1903L)
>>
>>:- Project [W_ID#14L, H_ID#8L, ID#0L, sex#37L, category#97L, AGE#3L,
>> AGE_TYPE#4, DEP_CODE#5, COUNTRY_CODE#6, Nationality#77, HOUSEHOLD_TYPE#9L,
>> familySize#117L, POSTAL_CODE#11, W_DEP_CODE#12, S_ID#57\
>>  L, workplaceSize#137L, Dummy_ID#16, Dummy_ID_1#17, Inc_period#157,
>> Time_inf#1064, Time_inc#200, Health#1014, Inf_period#1039,
>> infectedFamily#1355L, infectedWorker#1385L]
>>
>> +- Aggregate [S_ID#1903L], [S_ID#1903L, count(1) AS
>> infectedStreet#1415L]
>>
>> Does someone have a clue about it?
>> Thanks,
>>
>>
>>
>>
>> Didac Gil de la Iglesia
>> PhD in Computer Science
>> didacg...@gmail.com
>> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
>> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
>> Skype: didac.gil.de.la.iglesia
>>
>>
>


Re: Analysis Exception after join

2017-07-04 Thread Bernard Jesop
Thank Didac,

My bad, actually this code is incomplete, it should have been : - dfAgg =
df.groupBy("S_ID").agg(...).

I want to access the aggregated values (of dfAgg) for each row of 'df',
that is why I do a left outer join.


Also, regarding the second parameter, I am using this signature of join :
(Dataset, Seq[String], String) => Dataset.

It is a sequence of the column names to use as keys for the join, some kind
of syntactic sugar for (df1("key1") === df2("key1") && df1("key2") ===
df2("key2") && ),
except it will not duplicate the columns used as keys.

2017-07-03 12:39 GMT+02:00 Didac Gil :

> With the left join, you are joining two tables.
>
> In your case, df is the left table, dfAgg is the right table.
> The second parameter should be the joining condition, right?
> For instance
>
>  dfRes = df.join(dfAgg, $”userName”===$”name", "left_outer”)
>
> having a field in df called userName, and another in dfAgg called “name”
>
> However, what’s the kind of query you want to make? dfAgg is already the
> df table that has been grouped by S_ID.
>
> I guess that you are looking for something more like the following example
> dfAgg = df.groupBy("S_ID”)
>.agg(org.apache.spark.sql.functions.count(*“userName"*).as(
> *“usersCount**”*),
>  .agg(org.apache.spark.sql.functions.collect_set(“city")
> .as("ListofCities”)),
>  .agg(*org.apache.spark.sql.functions.max(“age").as(“oldest”))*
> )
>
> On 3 Jul 2017, at 11:55, Bernard Jesop  wrote:
>
> Hello, I don't understand my error message.
>
> Basically, all I am doing is :
> - dfAgg = df.groupBy("S_ID")
> - dfRes = df.join(dfAgg, Seq("S_ID"), "left_outer")
>
> However I get this AnalysisException: "
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> resolved attribute(s) S_ID#1903L missing from Dummy_ID#740,sex#37L,PERSONAL_
> STATUS#726L,W_DEP_CODE#736,W_SIZE#739L,
> POSTAL_CODE#735,COUNTRY_CODE#730,
> ID#724L,Dummy_ID_1#741,DEP_CODE#729,HOUSEHOLD_TYPE#733L,
> HOUSEHOLD_SIZE#734L,AGE#727L,W_ID#738L,H_ID#732L,AGE_TYPE#728,
> S_ID#57L,NATIONALITY#731
> in operator !Project [ID#724L, sex#37L, PERSON\
>  AL_STATUS#726L, AGE#727L, AGE_TYPE#728, DEP_CODE#729, COUNTRY_CODE#730,
> NATIONALITY#731 AS Nationality#77, H_ID#732L, HOUSEHOLD_TYPE#733L,
> HOUSEHOLD_SIZE#734L, POSTAL_CODE#735, W_DEP_CODE#736, S_ID#1903L,
> W_ID#738L, W_SIZE#739L, Dummy_ID#740, Dummy_ID_1#741];; "
>
> What I don't understand is it says S_ID#1903L is missing
> but everything seems fine on the Logical Plan.
> +- Join LeftOuter, (S_ID#57L = S_ID#1903L)
>
>:- Project [W_ID#14L, H_ID#8L, ID#0L, sex#37L, category#97L, AGE#3L,
> AGE_TYPE#4, DEP_CODE#5, COUNTRY_CODE#6, Nationality#77, HOUSEHOLD_TYPE#9L,
> familySize#117L, POSTAL_CODE#11, W_DEP_CODE#12, S_ID#57\
>  L, workplaceSize#137L, Dummy_ID#16, Dummy_ID_1#17, Inc_period#157,
> Time_inf#1064, Time_inc#200, Health#1014, Inf_period#1039,
> infectedFamily#1355L, infectedWorker#1385L]
>
> +- Aggregate [S_ID#1903L], [S_ID#1903L, count(1) AS
> infectedStreet#1415L]
>
> Does someone have a clue about it?
> Thanks,
>
>
>
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com
> Spain: +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
>


Kafka 0.10 with PySpark

2017-07-04 Thread Daniel van der Ende
Hi,

I'm working on integrating some pyspark code with Kafka. We'd like to use
SSL/TLS, and so want to use Kafka 0.10. Because structured streaming is
still marked alpha, we'd like to use Spark streaming. On this page,
however, it indicates that the Kafka 0.10 integration in Spark does not
support Python (
https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
I've been trying to figure out why, but have not been able to find
anything. Is there any particular reason for this?

Thanks,

Daniel


RE: [PySpark] - running processes and computing time

2017-07-04 Thread Sidney Feiner
To initialize it per executor, I used a class with only class attibutes and 
class methods (like an `object` in Scala), but because  I was using PySpark, it 
was actually being initiated per process ☹
What I went for was the broadcast variable but there still is something 
suspicious with my application – the processing time of each batch.

In my logs, I see that when I process a partition, it takes under a second. But 
in the Spark UI I see that a task takes between 3 and 6 seconds.
Shouldn't the partition process time and the task computing time be the same?

My code:


def process_func (obj, records):
start = time()
processed_records = # Some processing
logger.info("It took {0} seconds to handle records".format(time() - start, 
events_amount))  # This logs very small numbers (around 0.05 seoonds)
return analyzed_events

def handle_rdd(rdd: RDD):
start_time = time.time()
rdd.foreachPartition(lambda records: process_func(object_broadcast.value, 
records))
logger.info("Handle RDD took: {0} seconds".format(time.time() - 
start_time))  # This logs much bigger numbers (around 3-6 seconds)

ssc.union(*streams)\
.filter(lambda x: x[1] is not None)\
.map(lambda x: x[1])\
.foreachRDD(handle_rdd)  # Keep only values and cast them to TextAnalysis
ssc.start()
ssc.awaitTermination()


each RDD has at most 10 partitions which means that it should take around 0.5 
seconds for all the tasks to be processed.

Does anyone know what happens here? The time difference is too big for it to be 
networking right?

From: Sudev A C [mailto:sudev...@go-mmt.com]
Sent: Monday, July 3, 2017 7:48 PM
To: Sidney Feiner ; user@spark.apache.org
Subject: Re: [PySpark] - running processes

You might want to do the initialisation per partition (Not sure how you are 
achieving the per executor initialisation in your code ).

To initialise something for per partition, you may use something like 
rdd.forEach partition.

Or if you want something globally like a variable for further processing you 
might want to initialise it once as a broadcast variable and use access the 
data structure through broadcast variable.

Afaik python process will be initiated for per partition tasks.
On Mon, 3 Jul 2017 at 5:23 PM, Sidney Feiner 
> wrote:

In my Spark Streaming application, I have the need to build a graph from a file 
and initializing that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than 
once per executor, every time with a different process id (every process has 
it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only 
relevant when I develop in JVM languages like Scala/Java? Do executors in 
PySpark spawn new processes for new tasks?

And if they do, how can I make sure that my graph object will really only be 
initiated once?
Thanks :)


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




::DISCLAIMER::





This message is intended only for the use of the addressee and may contain 
information that is privileged, confidential and exempt from disclosure under 
applicable law. If the reader of this message is not the intended recipient, or 
the employee or agent responsible for delivering the message to the intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of this communication is strictly prohibited. If you have received this 
e-mail in error, please notify us immediately by return e-mail and delete this 
e-mail and all attachments from your system.


SparkSession via HS2 - is it supported?

2017-07-04 Thread Sudha KS
This is the code:  created a java class by extending 
org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
,and creates a sparkSession as-
 SparkSession spark = 
SparkSession.builder().enableHiveSupport().master("yarn-client").appName("SampleSparkUDTF_yarnV1").getOrCreate();
,and tries to read a table in hive DB:
Dataset inputData = spark.read().table(tableName);
Long countRows =  inputData.count();


Environment:  HDP-2.5.3.0, spark 2.0.0


Steps:
1.  Copied this custom UDTF jar into hdfs & also into auxlib
2.  Copied /usr/hdp/<2.5.x>/spark2/jars/*.jar into 
/usr/hdp/<2.5.x>/hive/auxlib/
3.  Connecting to HS2 using beeline to run this Spark UDT:

beeline -u jdbc:hive2://localhost:1 -d org.apache.hive.jdbc.HiveDriver



CREATE TABLE TestTable (i int);

INSERT INTO TestTable VALUES (1);



0: jdbc:hive2://localhost:1/> CREATE FUNCTION SparkUDT AS 'SparkHiveUDTF' 
using jar 'hdfs:///tmp/sparkHiveGenericUDTF-1.0.jar' ;

INFO  : converting to local hdfs:///tmp/sparkHiveGenericUDTF-1.0.jar

INFO  : Added 
[/tmp/69366d0d-6777-4860-82c0-c61482ccce87_resources/sparkHiveGenericUDTF-1.0.jar]
 to class path

INFO  : Added resources: [hdfs:///tmp/sparkHiveGenericUDTF-1.0.jar]

No rows affected (0.125 seconds)





0: jdbc:hive2://localhost:1/> SELECT SparkUDT('tbl','TestTable');



failed, info=[Error: Failure while running task:java.lang.RuntimeException: 
java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: 
Hive Runtime Error while processing writable (null)

at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:173)

at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)

at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:347)

at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:194)

at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:185)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)

at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:185)

at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.callInternal(TezTaskRunner.java:181)

at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: 
org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while 
processing writable (null)

at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91)

at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)

at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:325)

at 
org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:150)

... 14 more

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error 
while processing writable (null)

at 
org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:563)

at 
org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:83)

... 17 more

Caused by: org.apache.spark.SparkException: Yarn application has already ended! 
It might have been killed or unable to launch application master.

at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)

at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)

at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:149)

at org.apache.spark.SparkContext.(SparkContext.scala:497)

at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2275)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:831)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$8.apply(SparkSession.scala:823)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)

at SparkHiveUDTF.sparkJob(SparkHiveUDTF.java:97)

at SparkHiveUDTF.process(SparkHiveUDTF.java:78)

at 
org.apache.hadoop.hive.ql.exec.UDTFOperator.process(UDTFOperator.java:109)

at 

Re: test mail

2017-07-04 Thread Sudhanshu Janghel
test email recieved ;p

On 4 Jul 2017 7:40 am, "Sudha KS"  wrote:

-- 

*Disclaimer: The information in this email is confidential and may be 
legally privileged. Access to this email by anyone other than the intended 
addressee is unauthorized. If you are not the intended recipient of this 
message, any review, disclosure, copying, distribution, retention, or any 
action taken or omitted to be taken in reliance on it is prohibited and may 
be unlawful.*


test mail

2017-07-04 Thread Sudha KS