Re: Code fails when AQE enabled in Spark 3.1

2022-01-31 Thread Gaspar Muñoz
it looks that this commit (
https://github.com/apache/spark/commit/a85490659f45410be3588c669248dc4f534d2a71)
do the trick.

[image: image.png]

Don't you think, this bug is enough important to incluide in 3.1 branch?

Regards

El jue, 20 ene 2022 a las 8:55, Gaspar Muñoz ()
escribió:

> Hi guys,
>
> hundreds of spark jobs run on my company every day. We are running Spark
> 3.1.2 and we want enable Adaptive Query Execution (AQE) for all of them.
> We can't upgrade to 3.2 right now so we want enable it explicitly using
> appropriate conf when spark submit.
>
> Some of them fails when enable AQE but I can't discover what is
> happening.  In order to give your information I prepared a small snippet
> for spark shell that fails in Spark 3.1 when AQE enabled and works when
> disabled. It also work in 3.2 but I think maybe is a bug that can be fixed
> for 3.1.3.
>
> The code and explanation can be found here:
> https://issues.apache.org/jira/browse/SPARK-37898
>
> Regards
> --
> Gaspar Muñoz Soria
>


-- 
Gaspar Muñoz Soria

Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473


Code fails when AQE enabled in Spark 3.1

2022-01-19 Thread Gaspar Muñoz
Hi guys,

hundreds of spark jobs run on my company every day. We are running Spark
3.1.2 and we want enable Adaptive Query Execution (AQE) for all of them.
We can't upgrade to 3.2 right now so we want enable it explicitly using
appropriate conf when spark submit.

Some of them fails when enable AQE but I can't discover what is happening.
In order to give your information I prepared a small snippet for spark
shell that fails in Spark 3.1 when AQE enabled and works when disabled. It
also work in 3.2 but I think maybe is a bug that can be fixed for 3.1.3.

The code and explanation can be found here:
https://issues.apache.org/jira/browse/SPARK-37898

Regards
-- 
Gaspar Muñoz Soria


Re: Spark SQL - Truncate Day / Hour

2017-11-09 Thread Gaspar Muñoz
There are functions for day (called dayOfMonth and dayOfYear) and hour
(called hour). You can view them here:
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions

Example:

import org.apache.spark.sql.functions._
val df = df.select(hour($"myDateColumn"), dayOfMonth($"myDateColumn"),
dayOfYear($"myDateColumn"))

2017-11-09 12:05 GMT+01:00 David Hodefi <davidhodeffi.w...@gmail.com>:

> I would like to truncate date to his day or hour. currently it is only
> possible to truncate MONTH or YEAR.
> 1.How can achieve that?
> 2.Is there any pull request about this issue?
> 3.If there is not any open pull request about this issue, what are the
> implications that I should be aware of when coding /contributing it as a
> pull request?
>
> Last question is,  Looking at DateTImeUtils class code, it seems like
> implementation is not using any open library for handling dates i.e
> apache-common , Why implementing it instead of reusing open source?
>
> Thanks David
>



-- 
Gaspar Muñoz Soria

Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473


Re: spark-avro aliases incompatible

2017-11-07 Thread Gaspar Muñoz
In the doc you refer:

// The Avro records get converted to Spark types, filtered, and// then
written back out as Avro recordsval df =
spark.read.avro("/tmp/episodes.avro")df.filter("doctor >
5").write.avro("/tmp/output")

Alternatively you can specify the format to use instead:
[image: Copy to clipboard]Copy

val df = spark.read
.format("com.databricks.spark.avro")
.load("/tmp/episodes.avro")

As far as I know  spark-avro is not built-in in spark 2.x. That is not the
problem, because also in that databricks doc said: *"At the moment, it
ignores docs, aliases and other properties present in the Avro file."*

Regards.


2017-11-06 22:29 GMT+01:00 Gourav Sengupta <gourav.sengu...@gmail.com>:

> Hi,
>
> I may be wrong about this, but when you are using format("") you are
> basically using old SPARK classes, which still exists because of backward
> compatibility.
>
> Please refer to the following documentation to take advantage of the
> recent changes in SPARK: https://docs.databricks.com/spark/latest/
> data-sources/read-avro.html
>
> Kindly let us know how things are going on.
>
> Regards,
> Gourav Sengupta
>
> On Mon, Nov 6, 2017 at 8:04 PM, Gaspar Muñoz <gmu...@datiobd.com> wrote:
>
>> Of course,
>>
>> right now I'm trying in local with spark 2.2.0 and spark-avro 4.0.0.
>> I've just uploaded a snippet https://gist.github.co
>> m/gasparms/5d0740bd61a500357e0230756be963e1
>>
>> Basically, my avro schema has a field with an alias and in the last part
>> of code spark-avro is not able to read old data with old name using the
>> alias.
>>
>> In spark-avro library Readme said that is not supported and I am asking
>> if any of you has a workaround or how do you manage schema evolution?
>>
>> Regards.
>>
>> 2017-11-05 20:13 GMT+01:00 Gourav Sengupta <gourav.sengu...@gmail.com>:
>>
>>> Hi Gaspar,
>>>
>>> can you please provide the details regarding the environment, versions,
>>> libraries and code snippets please?
>>>
>>> For example: SPARK version, OS, distribution, running on YARN, etc and
>>> all other details.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Sun, Nov 5, 2017 at 9:03 AM, Gaspar Muñoz <gmu...@datiobd.com> wrote:
>>>
>>>> Hi there,
>>>>
>>>> I use avro format to store historical due to avro schema evolution. I
>>>> manage external schemas and read  them using avroSchema option so we have
>>>> been able to add and delete columns.
>>>>
>>>> The problem is when I introduced aliases and Spark process didn't work
>>>> as expected and then I read in spark-avro library "At the moment, it
>>>> ignores docs, aliases and other properties present in the Avro file".
>>>>
>>>> How do you manage aliases and column renaming? Is there any workaround?
>>>>
>>>> Thanks in advance.
>>>>
>>>> Regards
>>>>
>>>> --
>>>> Gaspar Muñoz Soria
>>>>
>>>> Vía de las dos Castillas, 33
>>>> <https://maps.google.com/?q=V%C3%ADa+de+las+dos+Castillas,+33=gmail=g>,
>>>> Ática 4, 3ª Planta
>>>> 28224 Pozuelo de Alarcón, Madrid
>>>> Tel: +34 91 828 6473
>>>>
>>>
>>>
>>
>>
>> --
>> Gaspar Muñoz Soria
>>
>> Vía de las dos Castillas, 33
>> <https://maps.google.com/?q=V%C3%ADa+de+las+dos+Castillas,+33=gmail=g>,
>> Ática 4, 3ª Planta
>> 28224 Pozuelo de Alarcón, Madrid
>> Tel: +34 91 828 6473
>>
>
>


-- 
Gaspar Muñoz Soria

Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473


Re: spark-avro aliases incompatible

2017-11-06 Thread Gaspar Muñoz
Of course,

right now I'm trying in local with spark 2.2.0 and spark-avro 4.0.0.  I've
just uploaded a snippet
https://gist.github.com/gasparms/5d0740bd61a500357e0230756be963e1

Basically, my avro schema has a field with an alias and in the last part of
code spark-avro is not able to read old data with old name using the alias.

In spark-avro library Readme said that is not supported and I am asking if
any of you has a workaround or how do you manage schema evolution?

Regards.

2017-11-05 20:13 GMT+01:00 Gourav Sengupta <gourav.sengu...@gmail.com>:

> Hi Gaspar,
>
> can you please provide the details regarding the environment, versions,
> libraries and code snippets please?
>
> For example: SPARK version, OS, distribution, running on YARN, etc and all
> other details.
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Nov 5, 2017 at 9:03 AM, Gaspar Muñoz <gmu...@datiobd.com> wrote:
>
>> Hi there,
>>
>> I use avro format to store historical due to avro schema evolution. I
>> manage external schemas and read  them using avroSchema option so we have
>> been able to add and delete columns.
>>
>> The problem is when I introduced aliases and Spark process didn't work as
>> expected and then I read in spark-avro library "At the moment, it ignores
>> docs, aliases and other properties present in the Avro file".
>>
>> How do you manage aliases and column renaming? Is there any workaround?
>>
>> Thanks in advance.
>>
>> Regards
>>
>> --
>> Gaspar Muñoz Soria
>>
>> Vía de las dos Castillas, 33
>> <https://maps.google.com/?q=V%C3%ADa+de+las+dos+Castillas,+33=gmail=g>,
>> Ática 4, 3ª Planta
>> 28224 Pozuelo de Alarcón, Madrid
>> Tel: +34 91 828 6473
>>
>
>


-- 
Gaspar Muñoz Soria

Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473


spark-avro aliases incompatible

2017-11-05 Thread Gaspar Muñoz
Hi there,

I use avro format to store historical due to avro schema evolution. I
manage external schemas and read  them using avroSchema option so we have
been able to add and delete columns.

The problem is when I introduced aliases and Spark process didn't work as
expected and then I read in spark-avro library "At the moment, it ignores
docs, aliases and other properties present in the Avro file".

How do you manage aliases and column renaming? Is there any workaround?

Thanks in advance.

Regards

-- 
Gaspar Muñoz Soria

Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 828 6473


Re: Best way to merge final output part files created by Spark job

2015-09-14 Thread Gaspar Muñoz
Hi, check out  FileUtil.copyMerge function in the Hadoop API
<https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs.FileSystem,
org.apache.hadoop.fs.Path, org.apache.hadoop.fs.FileSystem,
org.apache.hadoop.fs.Path, boolean, org.apache.hadoop.conf.Configuration,
java.lang.String)>.

It's simple,


   1. Get the hadoop configuration from Spark Context  FileSystem fs =
   FileSystem.get(sparkContext.hadoopConfiguration());
   2. Create new Path
   
<https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/fs/Path.html>with
   destination and source directory.
   3. Call copyMerge   FileUtil.copyMerge(fs, inputPath, fs, destPath,
   true, sparkContext.hadoopConfiguration(), null);


2015-09-13 23:25 GMT+02:00 unk1102 <umesh.ka...@gmail.com>:

> Hi I have a spark job which creates around 500 part files inside each
> directory I process. So I have thousands of such directories. So I need to
> merge these small small 500 part files. I am using
> spark.sql.shuffle.partition as 500 and my final small files are ORC files.
> Is there a way to merge orc files in Spark if not please suggest the best
> way to merge files created by Spark job in hdfs please guide. Thanks much.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-merge-final-output-part-files-created-by-Spark-job-tp24681.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 

Gaspar Muñoz
@gmunozsoria


<http://www.stratio.com/>
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd <https://twitter.com/StratioBD>*


Pagination on big table, splitting joins

2015-08-08 Thread Gaspar Muñoz
Hi,

I have two different parts in my system.

1. Batch application that every x minutes do sql queries between several
tables that contains millions of rows to compound a entity, and sent that
entities to Kafka.
2. Streaming application that processing data from Kafka.

Now, I have entire system working, but I want to improve the performance in
the batch part, because if I have 100 millions of entities I send them to
Kafka in a foreach method in a row, which makes no sense for the next
streaming application. I want, send each 10 millions events to Kafka, for
example.

I have a query, imagine

*select ... from table 1 left outer join table 2 on ... left outer join
table 3 on ... left outer join table 4 on ...*

My target is do *pagination* on table 1 and take 10 million in a separate
RDD, do the joins and send to Kafka,  then take another 10 million and do
the same... I have all tables in parquet format in hdfs.

I think to use *toLocalIterator* method and something like that, but I have
doubts about memory and parallelism and sure there is a better way to do it.

rdd.toLocalIterator.grouped(1000).foreach( seq =

val rdd: RDD[(String, Int)] = sc.parallelize(seq)
 // Do the processing

)

What do you think?

Regards.

-- 

Gaspar Muñoz
@gmunozsoria


http://www.stratio.com/
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*


Re: reduceByKey

2015-05-14 Thread Gaspar Muñoz
What have you tried so far?

Maybe, the easiest way is using a collection and reduce them adding its
values.

JavaPairRDDString, String pairRDD = sc.parallelizePairs(data);

JavaPairRDDString, ListInteger result = pairRDD.mapToPair(new
Functions.createList())
.mapToPair(new Functions.ListStringToInt())
.reduceByKey(new SumList());


Functions implementation using Java 7. Java 8 should be more simple.

public static final class SumList implements Function2ListInteger,
ListInteger, ListInteger {

@Override public ListInteger call(ListInteger l1,
ListInteger l2) throws Exception {
ListInteger result = new ArrayListInteger();

for(int i=0;il1.size();++i){
result.add(l1.get(i)+l2.get(i));
}

return result;
}
}


public static final class ListStringToInt implements
PairFunctionTuple2String, ListString, String,
ListInteger {

@Override public Tuple2String, ListInteger call(Tuple2String,
ListString tuple2) throws Exception {
ListInteger result = new ArrayListInteger();

for(String number : tuple2._2()){
result.add(Integer.valueOf(number));
}

return new Tuple2String, ListInteger(tuple2._1(),result);
}
}

public static final class createList implements
PairFunctionTuple2String, String, String,
ListString {

@Override public Tuple2String, ListString call(Tuple2String,
String tuple2) throws Exception {
return new Tuple2String, ListString(tuple2._1(),
Arrays.asList(tuple2._2().split(,)));
}
}


2015-05-14 15:40 GMT+02:00 Yasemin Kaya godo...@gmail.com:

 Hi,

 I have JavaPairRDDString, String and I want to implement reduceByKey
 method.

 My pairRDD :
 *2553: 0,0,0,1,0,0,0,0*
 46551: 0,1,0,0,0,0,0,0
 266: 0,1,0,0,0,0,0,0
 *2553: 0,0,0,0,0,1,0,0*

 *225546: 0,0,0,0,0,1,0,0*
 *225546: 0,0,0,0,0,1,0,0*

 I want to get :
 *2553: 0,0,0,1,0,1,0,0*
 46551: 0,1,0,0,0,0,0,0
 266: 0,1,0,0,0,0,0,0
 *225546: 0,0,0,0,0,2,0,0*

 Anyone can help me getting that?
 Thank you.

 Have a nice day.
 yasemin

 --
 hiç ender hiç




-- 

Gaspar Muñoz
@gmunozsoria


http://www.stratio.com/
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*


Re: Spark Mongodb connection

2015-05-04 Thread Gaspar Muñoz
Hi Yasemin,

You can find here a MongoDB connector for Spark SQL:
http://github.com/Stratio/spark-mongodb

Best regards

2015-05-04 9:27 GMT+02:00 Yasemin Kaya godo...@gmail.com:

 Hi!

 I am new at Spark and I want to begin Spark with simple wordCount example
 in Java. But I want to give my input from Mongodb database. I want to learn
 how can I connect Mongodb database to my project. Any one can help for this
 issue.

 Have a nice day
 yasemin

 --
 hiç ender hiç




-- 

Gaspar Muñoz
@gmunozsoria


http://www.stratio.com/
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*


Re: java.lang.IncompatibleClassChangeError when using PrunedFilteredScan

2015-03-30 Thread Gaspar Muñoz



 val database = parameters.getOrElse(Database, notFound(Database))



 val collection = parameters.getOrElse(Collection, notFound(Collection
 ))



 val samplingRatio = parameters

   .get(SamplingRatio)

   .map(_.toDouble).getOrElse(DefaultSamplingRatio)



 MongodbRelation(

   MongodbConfigBuilder()

 .set(Host,host)

 .set(Database,database)

 .set(Collection,collection)

 .set(SamplingRatio,samplingRatio).build())(sqlContext)



   }



 }



 In createRelation function it returns MongodbRelation, which is extended
 from PrunedFilteredScan

 case class MongodbRelation(

   config: DeepConfig,

   schemaProvided: Option[StructType] = None)(

   @transient val sqlContext: SQLContext) extends PrunedFilteredScan {



 Since both TableScan and PrunedFilteredScan are based on BaseRelation, I’m
 not sure why clazz.newInstance() call failed by
 java.lang.IncompatibleClassChangeError.

 Is there any special way to deal with if I need to use PrunedFilteredScan?



 I’m using scala 2.10 and JDK 1.7



 Thanks

 TW






-- 

Gaspar Muñoz
@gmunozsoria


http://www.stratio.com/
Vía de las dos Castillas, 33, Ática 4, 3ª Planta
28224 Pozuelo de Alarcón, Madrid
Tel: +34 91 352 59 42 // *@stratiobd https://twitter.com/StratioBD*