Re: How to reduce the amount of data that is getting written to the checkpoint from Spark Streaming

2017-07-03 Thread Yuval.Itzchakov
Using a long period betweem checkpoints may cause a long linage of the graphs
computations to be created, since Spark uses checkpointing to cut it, which
can also cause a delay in the streaming job.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-reduce-the-amount-of-data-that-is-getting-written-to-the-checkpoint-from-Spark-Streaming-tp28798p28820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Analysis Exception after join

2017-07-03 Thread Bernard Jesop
Hello, I don't understand my error message.

Basically, all I am doing is :
- dfAgg = df.groupBy("S_ID")
- dfRes = df.join(dfAgg, Seq("S_ID"), "left_outer")

However I get this AnalysisException: "
Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved
attribute(s) S_ID#1903L missing from
Dummy_ID#740,sex#37L,PERSONAL_STATUS#726L,W_DEP_CODE#736,W_SIZE#739L,
POSTAL_CODE#735,COUNTRY_CODE#730,
ID#724L,Dummy_ID_1#741,DEP_CODE#729,HOUSEHOLD_TYPE#733L,
HOUSEHOLD_SIZE#734L,AGE#727L,W_ID#738L,H_ID#732L,AGE_TYPE#728,
S_ID#57L,NATIONALITY#731
in operator !Project [ID#724L, sex#37L, PERSON\
 AL_STATUS#726L, AGE#727L, AGE_TYPE#728, DEP_CODE#729, COUNTRY_CODE#730,
NATIONALITY#731 AS Nationality#77, H_ID#732L, HOUSEHOLD_TYPE#733L,
HOUSEHOLD_SIZE#734L, POSTAL_CODE#735, W_DEP_CODE#736, S_ID#1903L,
W_ID#738L, W_SIZE#739L, Dummy_ID#740, Dummy_ID_1#741];; "

What I don't understand is it says S_ID#1903L is missing
but everything seems fine on the Logical Plan.
+- Join LeftOuter, (S_ID#57L = S_ID#1903L)

   :- Project [W_ID#14L, H_ID#8L, ID#0L, sex#37L, category#97L, AGE#3L,
AGE_TYPE#4, DEP_CODE#5, COUNTRY_CODE#6, Nationality#77, HOUSEHOLD_TYPE#9L,
familySize#117L, POSTAL_CODE#11, W_DEP_CODE#12, S_ID#57\
 L, workplaceSize#137L, Dummy_ID#16, Dummy_ID_1#17, Inc_period#157,
Time_inf#1064, Time_inc#200, Health#1014, Inf_period#1039,
infectedFamily#1355L, infectedWorker#1385L]

+- Aggregate [S_ID#1903L], [S_ID#1903L, count(1) AS
infectedStreet#1415L]

Does someone have a clue about it?
Thanks,


Re: Analysis Exception after join

2017-07-03 Thread Didac Gil
With the left join, you are joining two tables.

In your case, df is the left table, dfAgg is the right table.
The second parameter should be the joining condition, right?
For instance

 dfRes = df.join(dfAgg, $”userName”===$”name", "left_outer”)

having a field in df called userName, and another in dfAgg called “name”

However, what’s the kind of query you want to make? dfAgg is already the df 
table that has been grouped by S_ID.

I guess that you are looking for something more like the following example
dfAgg = df.groupBy("S_ID”)
   .agg(org.apache.spark.sql.functions.count(“userName").as(“usersCount”),
 .agg(org.apache.spark.sql.functions.collect_set(“city") .as("ListofCities”)),
 .agg(org.apache.spark.sql.functions.max(“age").as(“oldest”))
)

> On 3 Jul 2017, at 11:55, Bernard Jesop  wrote:
> 
> Hello, I don't understand my error message.
> 
> Basically, all I am doing is :
> - dfAgg = df.groupBy("S_ID")
> - dfRes = df.join(dfAgg, Seq("S_ID"), "left_outer")
> 
> However I get this AnalysisException: "
> Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved 
> attribute(s) S_ID#1903L missing from 
> Dummy_ID#740,sex#37L,PERSONAL_STATUS#726L,W_DEP_CODE#736,W_SIZE#739L,
> POSTAL_CODE#735,COUNTRY_CODE#730,
> ID#724L,Dummy_ID_1#741,DEP_CODE#729,HOUSEHOLD_TYPE#733L,
> HOUSEHOLD_SIZE#734L,AGE#727L,W_ID#738L,H_ID#732L,AGE_TYPE#728,
> S_ID#57L,NATIONALITY#731
> in operator !Project [ID#724L, sex#37L, PERSON\
>  AL_STATUS#726L, AGE#727L, AGE_TYPE#728, DEP_CODE#729, COUNTRY_CODE#730, 
> NATIONALITY#731 AS Nationality#77, H_ID#732L, HOUSEHOLD_TYPE#733L, 
> HOUSEHOLD_SIZE#734L, POSTAL_CODE#735, W_DEP_CODE#736, S_ID#1903L,
> W_ID#738L, W_SIZE#739L, Dummy_ID#740, Dummy_ID_1#741];; "
> 
> What I don't understand is it says S_ID#1903L is missing
> but everything seems fine on the Logical Plan.
> +- Join LeftOuter, (S_ID#57L = S_ID#1903L)
> 
>:- Project [W_ID#14L, H_ID#8L, ID#0L, sex#37L, category#97L, AGE#3L, 
> AGE_TYPE#4, DEP_CODE#5, COUNTRY_CODE#6, Nationality#77, HOUSEHOLD_TYPE#9L, 
> familySize#117L, POSTAL_CODE#11, W_DEP_CODE#12, S_ID#57\
>  L, workplaceSize#137L, Dummy_ID#16, Dummy_ID_1#17, Inc_period#157, 
> Time_inf#1064, Time_inc#200, Health#1014, Inf_period#1039, 
> infectedFamily#1355L, infectedWorker#1385L]
> 
> +- Aggregate [S_ID#1903L], [S_ID#1903L, count(1) AS infectedStreet#1415L]
> 
> Does someone have a clue about it?
> Thanks,
> 
> 
> 

Didac Gil de la Iglesia
PhD in Computer Science
didacg...@gmail.com
Spain: +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia



signature.asc
Description: Message signed with OpenPGP


[PySpark] - running processes

2017-07-03 Thread Sidney Feiner
In my Spark Streaming application, I have the need to build a graph from a file 
and initializing that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than 
once per executor, every time with a different process id (every process has 
it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only 
relevant when I develop in JVM languages like Scala/Java? Do executors in 
PySpark spawn new processes for new tasks?

And if they do, how can I make sure that my graph object will really only be 
initiated once?
Thanks :)


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




Re: What's the simplest way to Read Avro records from Kafka to Spark DataSet/DataFrame?

2017-07-03 Thread kant kodali
import org.apache.avro.Schemaimport org.apache.spark.sql.SparkSession
val schema = new Schema.Parser().parse(new File("user.avsc"))val spark
= SparkSession.builder().master("local").getOrCreate()
spark
  .read
  .format("com.databricks.spark.avro")
  .option("avroSchema", schema.toString)
  .load("src/test/resources/episodes.avro").show()


On Thu, Jun 29, 2017 at 1:59 AM, kant kodali  wrote:

> Forgot to mention I am getting a stream of Avro records and I want to do
> Structured streaming on these Avro records but first I wan to be able to
> parse them and put them in a DataSet or something like that.
>
> On Thu, Jun 29, 2017 at 12:56 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> What's the simplest way to Read Avro records from Kafka and put it into
>> Spark DataSet/DataFrame without using Confluent Schema registry or Twitter
>> Bijection API?
>>
>> Thanks!
>>
>>
>>
>>
>
>


[Spark SQL] JDBC connection from UDF

2017-07-03 Thread Patrik Medvedev
Hello guys,

I'm using Spark SQL with Hive thru Thrift.
I need this because I need to create a table by table mask.
Here is an example:
1. Take tables by mask, like SHOW TABLES IN db 'table__*'
2. Create query like:
CREATE TABLE total_data AS
SELECT * FROM table__1
UNION ALL
SELECT * FROM table__2
UNION ALL
SELECT * FROM table__3

Due to this, i need to create JDBC connection inside UDF, problem is, that
i need to create connection dynamically, that means, that i need to take
host name, port and user name from hive properties, this's easy from hive,
i'm using properties:
host - hive.server2.thrift.bind.host
port - hive.server2.thrift.port
user - user always the same as ran UDF

but, problem is that hive.server2.thrift.bind.host parameter not defined in
Yarn, and user that ran UDF is hive.
Maybe you have solution, how i can get host name, and more important thing
- how i can run UDF from user that ran SQL(not user hive).


spark submit with logs and kerberos

2017-07-03 Thread Juan Rods
Hi!

I have a question about logs and have not seen the answer through internet.
I have a spark submit process and I configure a custom log configuration to
it using the next params:
--conf
"spark.executor.extraJavaOptions=-Dlog4j.configuration=customlog4j.properties"
--driver-java-options '-Dlog4j.configuration=customlog4j.properties'
So far is working great, but now I have to add kerberos params like this:
--conf spark.hadoop.fs.hdfs.impl.disable.cache=true
--keytab 
--principal 
When I add those params (and in particular, --principal) my custom log file
starts being ignored and the default spark log file is being used insted.
Change the default spark configuration file or path is a little difficult
for us since we can't access that cluster.
Is there any way to keep using my custom log configuration the same way? I
am doing something wrong?

Thanks for the help!


Re: spark-jdbc impala with kerberos using yarn-client

2017-07-03 Thread morfious902002
Did you ever find a solution to this? If so, can you share your solution? I
am running into similar issue in YARN cluster mode connecting to impala
table.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-jdbc-impala-with-kerberos-using-yarn-client-tp27589p28819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org