Re: Load multiple CSV from different paths

2017-07-05 Thread Didac Gil
Thanks man!

That was the key.

source = […].toSeq

sources: _*

Learnt something more with Scala.

> On 5 Jul 2017, at 16:29, Radhwane Chebaane <r.cheba...@mindlytix.com> wrote:
> 
> Hi,
> 
> Referring to spark 2.x documentation, in org.apache.spark.sql.DataFrameReader 
>  you have this function:
> def csv(paths: String*): DataFrame 
> <http://spark.apache.org/docs/2.1.0/api/scala/org/apache/spark/sql/package.html#DataFrame=org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]>
> 
> So you can unpack your Array of paths like this:
> val sources = paths.split(',').toSeq
> spark.read.option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
> .option('mode', 'DROPMALFORMED')
> .csv(sources: _*)
> 
> In spark 1.6.x I think this may work with spark-csv 
> <https://github.com/databricks/spark-csv> :
> 
> spark.read.format("com.databricks.spark.csv").option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
>     .option('mode', 'DROPMALFORMED')
> .load(sources: _*)
> 
> 
> Cheers,
> Radhwane Chebaane
> 
> 2017-07-05 16:08 GMT+02:00 Didac Gil <didacgil9...@gmail.com 
> <mailto:didacgil9...@gmail.com>>:
> Hi,
> 
> Do you know any simple way to load multiple csv files (same schema) that are 
> in different paths?
> Wildcards are not a solution, as I want to load specific csv files from 
> different folders.
> 
> I came across a solution 
> (https://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load
>  
> <https://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load>)
>  that suggests something like
> 
> spark.read.format("csv").option("header", "false")
> .schema(custom_schema)
> .option('delimiter', '\t')
> .option('mode', 'DROPMALFORMED')
> .load(paths.split(','))
> However, even it mentions that this approach would work in Spark 2.x, I don’t 
> find an implementation of load that accepts an Array[String] as an input 
> parameter.
> 
> Thanks in advance for your help.
> 
> 
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacg...@gmail.com <mailto:didacg...@gmail.com>
> Spain: +34 696 285 544 <tel:+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <tel:+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
> 
> 
> 
> 
> --
> 
>   Radhwane Chebaane
> Distributed systems engineer, Mindlytix
> Mail: radhw...@mindlytix.com  <mailto:radhw...@mindlytix.com>
> Mobile: +33 695 588 906 <tel:+33+695+588+906>  
> <https://mail.google.com/mail/u/0/#>
> Skype: rad.cheb  <https://mail.google.com/mail/u/0/#>
> LinkedIn <https://fr.linkedin.com/in/radhwane-chebaane-483b3a7b>  
> <https://mail.google.com/mail/u/0/#>

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Load multiple CSV from different paths

2017-07-05 Thread Didac Gil
Hi,

Do you know any simple way to load multiple csv files (same schema) that are in 
different paths?
Wildcards are not a solution, as I want to load specific csv files from 
different folders.

I came across a solution 
(https://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load
 
<https://stackoverflow.com/questions/37639956/how-to-import-multiple-csv-files-in-a-single-load>)
 that suggests something like

spark.read.format("csv").option("header", "false")
.schema(custom_schema)
.option('delimiter', '\t')
.option('mode', 'DROPMALFORMED')
.load(paths.split(','))
However, even it mentions that this approach would work in Spark 2.x, I don’t 
find an implementation of load that accepts an Array[String] as an input 
parameter.

Thanks in advance for your help.


Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: Analysis Exception after join

2017-07-03 Thread Didac Gil
With the left join, you are joining two tables.

In your case, df is the left table, dfAgg is the right table.
The second parameter should be the joining condition, right?
For instance

 dfRes = df.join(dfAgg, $”userName”===$”name", "left_outer”)

having a field in df called userName, and another in dfAgg called “name”

However, what’s the kind of query you want to make? dfAgg is already the df 
table that has been grouped by S_ID.

I guess that you are looking for something more like the following example
dfAgg = df.groupBy("S_ID”)
   .agg(org.apache.spark.sql.functions.count(“userName").as(“usersCount”),
 .agg(org.apache.spark.sql.functions.collect_set(“city") .as("ListofCities”)),
 .agg(org.apache.spark.sql.functions.max(“age").as(“oldest”))
)

> On 3 Jul 2017, at 11:55, Bernard Jesop <bernard.je...@gmail.com> wrote:
> 
> Hello, I don't understand my error message.
> 
> Basically, all I am doing is :
> - dfAgg = df.groupBy("S_ID")
> - dfRes = df.join(dfAgg, Seq("S_ID"), "left_outer")
> 
> However I get this AnalysisException: "
> Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved 
> attribute(s) S_ID#1903L missing from 
> Dummy_ID#740,sex#37L,PERSONAL_STATUS#726L,W_DEP_CODE#736,W_SIZE#739L,
> POSTAL_CODE#735,COUNTRY_CODE#730,
> ID#724L,Dummy_ID_1#741,DEP_CODE#729,HOUSEHOLD_TYPE#733L,
> HOUSEHOLD_SIZE#734L,AGE#727L,W_ID#738L,H_ID#732L,AGE_TYPE#728,
> S_ID#57L,NATIONALITY#731
> in operator !Project [ID#724L, sex#37L, PERSON\
>  AL_STATUS#726L, AGE#727L, AGE_TYPE#728, DEP_CODE#729, COUNTRY_CODE#730, 
> NATIONALITY#731 AS Nationality#77, H_ID#732L, HOUSEHOLD_TYPE#733L, 
> HOUSEHOLD_SIZE#734L, POSTAL_CODE#735, W_DEP_CODE#736, S_ID#1903L,
> W_ID#738L, W_SIZE#739L, Dummy_ID#740, Dummy_ID_1#741];; "
> 
> What I don't understand is it says S_ID#1903L is missing
> but everything seems fine on the Logical Plan.
> +- Join LeftOuter, (S_ID#57L = S_ID#1903L)
> 
>:- Project [W_ID#14L, H_ID#8L, ID#0L, sex#37L, category#97L, AGE#3L, 
> AGE_TYPE#4, DEP_CODE#5, COUNTRY_CODE#6, Nationality#77, HOUSEHOLD_TYPE#9L, 
> familySize#117L, POSTAL_CODE#11, W_DEP_CODE#12, S_ID#57\
>  L, workplaceSize#137L, Dummy_ID#16, Dummy_ID_1#17, Inc_period#157, 
> Time_inf#1064, Time_inc#200, Health#1014, Inf_period#1039, 
> infectedFamily#1355L, infectedWorker#1385L]
> 
> +- Aggregate [S_ID#1903L], [S_ID#1903L, count(1) AS infectedStreet#1415L]
> 
> Does someone have a clue about it?
> Thanks,
> 
> 
> 

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: How to print data to console in structured streaming using Spark 2.1.0?

2017-05-16 Thread Didac Gil
From what I know, you would have to iterate on each RDD. When you are reading 
from the Stream, Spark actually collects the data as a miniRDD for each period 
of time.

I hope this helps.
ds.foreachRDD{ rdd =>
  val newNames = Seq(“Field1”,"Field2”,"Field3")
  val mydataDF = rdd.toDF(newNames: _*)
  mydataDF.createOrReplaceTempView(“myTempTable")
  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = spark.sql("select *, now() as TStamp from 
myTempTable")
  wordCountsDataFrame.write.mode(mode).save(output)
  val lines = wordCountsDataFrame.count().toInt
//  wordCountsDataFrame.show(20, false)
  println("Total entries in this batch: "+lines)
}

> On 16 May 2017, at 09:36, kant kodali <kanth...@gmail.com> wrote:
> 
> Hi All,
> 
> I have the following code.
> 
>  val ds = sparkSession.readStream()
> .format("kafka")
> .option("kafka.bootstrap.servers",bootstrapServers))
> .option("subscribe", topicName)
> .option("checkpointLocation", hdfsCheckPointDir)
> .load();
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
> There are no errors when I execute this code however I don't see any data 
> being printed out to console? When I run my standalone test Kafka consumer 
> jar I can see that it is receiving messages. so I am not sure what is going 
> on with above code? any ideas?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: How can i merge multiple rows to one row in sparksql or hivesql?

2017-05-15 Thread Didac Gil
Or maybe you could also check using the collect_list from the SQL functions
val compacter = Data1.groupBy(“UserID")
  
.agg(org.apache.spark.sql.functions.collect_list(“feature").as(“ListOfFeatures"))


> On 15 May 2017, at 15:15, Jone Zhang <joyoungzh...@gmail.com> wrote:
> 
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
> 
> Data2(has 1 billion records)
> user_id1  feature3
> 
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
> 
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
> 
> Is there a more efficient way except join?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: How can i merge multiple rows to one row in sparksql or hivesql?

2017-05-15 Thread Didac Gil
I guess that if your user_id field is the key, you could use the 
updateStateByKey function.

I did not test it, but it could be something along these lines:

def yourCombineFunction(input: Seq[(String)],accumulatedInput: Option[(String)] 
= {
val state = accumulatedInput.getOrElse((“”)) //In case the current Key 
was not found before, the features list is empty
val feature = input._1 //We get the feature value of this new entry

val newFeature = state._1 +” “+feature
Some((newFeature)) //The new accumulated value for the features is 
returned
}

val updatedData = Data1.updateStateByKey(yourCombineFunction) //This would 
“iterate” among all the entries in your Dataset and, for each row, will update 
the “accumulatedFeatures”

Good luck

> On 15 May 2017, at 15:15, Jone Zhang <joyoungzh...@gmail.com> wrote:
> 
> For example
> Data1(has 1 billion records)
> user_id1  feature1
> user_id1  feature2
> 
> Data2(has 1 billion records)
> user_id1  feature3
> 
> Data3(has 1 billion records)
> user_id1  feature4
> user_id1  feature5
> ...
> user_id1  feature100
> 
> I want to get the result as follow
> user_id1  feature1 feature2 feature3 feature4 feature5...feature100
> 
> Is there a more efficient way except join?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: Dataframes na fill with empty list

2017-04-11 Thread Didac Gil
It does support it, at least in 2.0.2 as I am running:

Here one example:

val parsedLines = stream_of_logs
  .map(line => p.parseRecord_viaCSVParser(line))
  .join(appsCateg,$"Application"===$"name","left_outer")
  .drop("id")
  .na.fill(0, Seq(“numeric_field1”,"numeric_field2"))
  .na.fill("", Seq(
   “text_field1","text_field2","text_field3”))

Notice that you have to differentiate those fields that are meant to be filled 
with an int, from those that require a different value, an empty string in my 
case.

> On 11 Apr 2017, at 03:18, Sumona Routh <sumos...@gmail.com 
> <mailto:sumos...@gmail.com>> wrote:
> 
> Hi there,
> I have two dataframes that each have some columns which are of list type 
> (array generated by the collect_list function actually).
> 
> I need to outer join these two dfs, however by nature of an outer join I am 
> sometimes left with null values. Normally I would use df.na.fill(...), 
> however it appears the fill function doesn't support this data type.
> 
> Can anyone recommend an alternative? I have also been playing around with 
> coalesce in a sql expression, but I'm not having any luck here either.
> 
> Obviously, I can do a null check on the fields downstream, however it is not 
> in the spirit of scala to pass around nulls, so I wanted to see if I was 
> missing another approach first.
> 
> Thanks,
> Sumona
> 
> I am using Spark 2.0.2

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com <mailto:didacg...@gmail.com>
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia
> On 11 Apr 2017, at 03:18, Sumona Routh <sumos...@gmail.com> wrote:
> 
> Hi there,
> I have two dataframes that each have some columns which are of list type 
> (array generated by the collect_list function actually).
> 
> I need to outer join these two dfs, however by nature of an outer join I am 
> sometimes left with null values. Normally I would use df.na.fill(...), 
> however it appears the fill function doesn't support this data type.
> 
> Can anyone recommend an alternative? I have also been playing around with 
> coalesce in a sql expression, but I'm not having any luck here either.
> 
> Obviously, I can do a null check on the fields downstream, however it is not 
> in the spirit of scala to pass around nulls, so I wanted to see if I was 
> missing another approach first.
> 
> Thanks,
> Sumona
> 
> I am using Spark 2.0.2

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


Re: kafka and spark integration

2017-03-22 Thread Didac Gil
Spark can be a consumer and a producer from the Kafka point of view.

You can create a kafka client in Spark that registers to a topic and reads the 
feeds, and you can process data in Spark and generate a producer that sends 
that data into a topic.
So, Spark lies next to Kafka and you can use Kafka as a channel to collect and 
send the data.

That’s what I am doing, at least.

> On 22 Mar 2017, at 08:08, Adaryl Wakefield  
> wrote:
> 
> I’m a little confused on how to use Kafka and Spark together. Where exactly 
> does Spark lie in the architecture? Does it sit on the other side of the 
> Kafka producer? Does it feed the consumer? Does it pull from the consumer?
> 
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685
> www.massstreet.net 
> www.linkedin.com/in/bobwakefieldmba 
> 
> Twitter: @BobLovesData



signature.asc
Description: Message signed with OpenPGP


Re: Suprised!!!!!Spark-shell showing inconsistent results

2017-02-02 Thread Didac Gil
Is 1570 the value of Col1?
If so, you have ordered by that column and selected only the first item. It 
seems that both results have the same Col1 value, therefore any of them would 
be a right answer to return. Right?

> On 2 Feb 2017, at 11:03, Alex  wrote:
> 
> Hi As shown below same query when ran back to back showing inconsistent 
> results..
> 
> testtable1 is Avro Serde table... 
> 
> 
> 
> 
> 
>  hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res14: Array[org.apache.spark.sql.Row] = 
> Array([1570,3364,201607,Y,APJ,PHILIPPINES,8518944,null,null,null,null,-15.992583,0.0,-15.992583,null,null,MONTH_ITEM_GROUP])
> 
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res15: Array[org.apache.spark.sql.Row] = 
> Array([1570,485888,20163,N,AMERICAS,BRAZIL,null,null,null,null,null,6019.2999,17198.0,6019.2999,null,null,QUARTER_GROUP])
> 
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res16: Array[org.apache.spark.sql.Row] = Array([1570,3930,201607,Y,APJ,INDIA 
> SUB-CONTINENT,8741220,null,null,null,null,-208.485216,0.0,-208.485216,null,null,MONTH_ITEM_GROUP])
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe fails to save to MySQL table in spark app, but succeeds in spark shell

2017-01-26 Thread Didac Gil
Are you sure that “age” is a numeric field?

Even numeric, you could pass the “44” between quotes: 

INSERT into your_table ("user","age","state") VALUES ('user3’,’44','CT’)

Are you sure there are no more fields that are specified as NOT NULL, and that 
you did not provide a value (besides user, age and state)?


> On 26 Jan 2017, at 04:42, Xuan Dzung Doan  
> wrote:
> 
> Hi,
> 
> Spark version 2.1.0
> MySQL community server version 5.7.17
> MySQL Connector Java 5.1.40
> 
> I need to save a dataframe to a MySQL table. In spark shell, the following 
> statement succeeds:
> 
> scala> df.write.mode(SaveMode.Append).format("jdbc").option("url", 
> "jdbc:mysql://127.0.0.1:3306/mydb").option("dbtable", 
> "person").option("user", "username").option("password", "password").save()
> 
> I write an app that basically does the same thing, issuing the same statement 
> saving the same dataframe to the same MySQL table. I run it using 
> spark-submit, but it fails, reporting some error in the SQL syntax. Here's 
> the detailed stack trace:
> 
> 17/01/25 16:06:02 INFO DAGScheduler: Job 2 failed: save at 
> DataIngestionJob.scala:119, took 0.159574 s
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: 
> Lost task 0.0 in stage 2.0 (TID 3, localhost, executor driver): 
> java.sql.BatchUpdateException: You have an error in your SQL syntax; check 
> the manual that corresponds to your MySQL server version for the right syntax 
> to use near '"user","age","state") VALUES ('user3',44,'CT')' at line 1
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>   at com.mysql.jdbc.Util.getInstance(Util.java:408)
>   at 
> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1162)
>   at 
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1773)
>   at 
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1257)
>   at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:958)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:597)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You 
> have an error in your SQL syntax; check the manual that corresponds to your 
> MySQL server version for the right syntax to use near '"user","age","state") 
> VALUES ('user3',44,'CT')' at line 1
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>   at com.mysql.jdbc.Util.getInstance(Util.java:408)
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:943)
>   at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3970)
>   at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3906)
>   at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524)
>   at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2677)
>   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549)
>   at 
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
>   

[no subject]

2016-11-28 Thread Didac Gil
Any suggestions for using something like OneHotEncoder and StringIndexer on
an InputDStream?

I could try to combine an Indexer based on a static parquet but I want to
use the OneHotEncoder approach in Streaming data coming from a socket.

Thanks!

Dídac Gil de la Iglesia