Re: [SQL] parse_url does not work for Internationalized domain names ?

2018-01-12 Thread yash datta
Thanks for the prompt reply!.

Opened a ticket here: https://issues.apache.org/jira/browse/SPARK-23056


BR
Yash

On Fri, Jan 12, 2018 at 3:41 PM, StanZhai  wrote:

> This problem was introduced by
>  which is designed to
> improve performance of PARSE_URL().
>
> The same issue exists in the following SQL:
>
> ```SQL
> SELECT PARSE_URL('http://stanzhai.site?p=["abc;]', 'QUERY', 'p')
>
> // return null in Spark 2.1+
> // return ["abc"] less than Spark 2.1
> ```
>
> I think it's a regression.
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


[SQL] parse_url does not work for Internationalized domain names ?

2018-01-11 Thread yash datta
Hi devs,

Stumbled across an interesting problem with the parse_url function that has
been implemented in spark in
https://issues.apache.org/jira/browse/SPARK-16281

When using internationalized Domains in the urls like:

val url = "http://правительство.рф "

The parse_url returns null, but works fine when using the hive 's version
of parse_url

On digging further, found that the difference is in below call in spark:

private def getUrl(url: UTF8String): URI = {
  try {
new URI(url.toString)
  } catch {
case e: URISyntaxException => null
  }
}


while hive uses java.net.URL:

url = new URL(urlStr)


Sure enough, this simple test demonstrates URL works but URI does not in
this case:

val url = "http://правительство.рф "

val uriHost = new URI(url).getHost
val urlHost = new URL(url).getHost

println(s"uriHost = $uriHost") // prints uriHost = null
println(s"urlHost = $urlHost") // prints urlHost = правительство.рф


To reproduce the problem on spark-sql:

spark-sql> select parse_url('http://千夏ともか.test
', 'HOST');
returns NULL

Could someone  please explain the reason of using URI instead of URL ? Does
this problem warrant creating a jira ticket ?


Best Regards
Yash

-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


Re: Dataframe Partitioning

2016-03-01 Thread yash datta
+1
This is  one of the most common problems we encounter in our flow. Mark, I
am happy to help if you would like to share some of the workload.

Best
Yash

On Wednesday 2 March 2016, Mark Hamstra  wrote:

> I don't entirely agree.  You're best off picking the right size :).
> That's almost impossible, though, since at the input end of the query
> processing you often want a large number of partitions to get sufficient
> parallelism for both performance and to avoid spilling or OOM, while at the
> output end of the query processing (after all the pruning and filtering)
> you often have only a few result rows, which means that splitting those few
> rows across many partitions in order to do a sort or similar is actually
> pretty silly and inefficient. I'll frequently see sorts where the
> per-partition sorts have only one or two records and it would have been
> quicker and more efficient to sort using a small number of partitions
> rather than using RangePartitioning to split the few rows across many
> partitions, then doing a degenerate/trivial form of sort on each of those
> partitions with their one or two rows, and finally merging all those tiny
> partitions back in order to produce the final results.
>
> Since the optimum number of shuffle partitions is different at different
> points in the query processing flow, it's really impossible to pick a
> static best number of shuffle partitions.  Using spark.sql.adaptive.enabled
> to turn on ExchangeCoordinator and dynamically set the number of shuffle
> partitions mostly works pretty well, but it still has at least a couple of
> issues.  One is that it makes things worse in the case of data skew since
> it doesn't stop coalescing partitions until after the coalesced partition
> size exceeds a target value; so if you've got some big ugly partitions that
> exceed the target size all on their own, they'll often be even bigger and
> uglier after the ExchangeCoordinator is done merging them with a few
> smaller partitions.  The other issue is that adaptive partitioning doesn't
> even try to do anything currently with any partitioning other than
> HashPartitioning, so you've still got the sorting problem using
> RangePartitioning that I just got done describing.
>
> I've actually started working on addressing each of those problems.
>
> On Tue, Mar 1, 2016 at 3:43 PM, Michael Armbrust  > wrote:
>
>> If you have to pick a number, its better to over estimate than
>> underestimate since task launching in spark is relatively cheap compared to
>> spilling to disk or OOMing (now much less likely due to Tungsten).
>> Eventually, we plan to make this dynamic, but you should tune for your
>> particular workload.
>>
>> On Tue, Mar 1, 2016 at 3:19 PM, Teng Liao > > wrote:
>>
>>> Hi,
>>>
>>> I was wondering what the rationale behind defaulting all repartitioning
>>> to spark.sql.shuffle.partitions is. I’m seeing a huge overhead when running
>>> a job whose input partitions is 2 and, using the default value for
>>> spark.sql.shuffle.partitions, this is now 200. Thanks.
>>>
>>> -Teng Fei Liao
>>>
>>
>>
>

-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


Re: [discuss] dropping Python 2.6 support

2016-01-05 Thread yash datta
+1

On Tue, Jan 5, 2016 at 1:57 PM, Jian Feng Zhang 
wrote:

> +1
>
> We use Python 2.7+ and 3.4+ to call PySpark.
>
> 2016-01-05 15:58 GMT+08:00 Kushal Datta :
>
>> +1
>>
>> 
>> Dr. Kushal Datta
>> Senior Research Scientist
>> Big Data Research & Pathfinding
>> Intel Corporation, USA.
>>
>> On Mon, Jan 4, 2016 at 11:52 PM, Jean-Baptiste Onofré 
>> wrote:
>>
>>> +1
>>>
>>> no problem for me to remove Python 2.6 in 2.0.
>>>
>>> Thanks
>>> Regards
>>> JB
>>>
>>>
>>> On 01/05/2016 08:17 AM, Reynold Xin wrote:
>>>
 Does anybody here care about us dropping support for Python 2.6 in Spark
 2.0?

 Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json
 parsing) when compared with Python 2.7. Some libraries that Spark depend
 on stopped supporting 2.6. We can still convince the library maintainers
 to support 2.6, but it will be extra work. I'm curious if anybody still
 uses Python 2.6 to run Spark.

 Thanks.



>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Best,
> Jian Feng
>



-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


Re: KryoSerializer for closureSerializer in DAGScheduler

2015-08-31 Thread yash datta
Thanks josh ... i'll take a look
On 31 Aug 2015 19:21, "Josh Rosen" <rosenvi...@gmail.com> wrote:

> There are currently a few known issues with using KryoSerializer as the
> closure serializer, so it's going to require some changes to Spark if we
> want to properly support this. See
> https://github.com/apache/spark/pull/6361 and
> https://issues.apache.org/jira/browse/SPARK-7708 for some discussion of
> the difficulties here.
>
> On Mon, Aug 31, 2015 at 3:44 AM, yash datta <sau...@gmail.com> wrote:
>
>> Hi devs,
>>
>> Curently the only supported serializer for serializing tasks in
>> DAGScheduler.scala is JavaSerializer.
>>
>>
>> val taskBinaryBytes: Array[Byte] = stage match {
>>   case stage: ShuffleMapStage =>
>> closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
>> AnyRef).array()
>>   case stage: ResultStage =>
>> closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : 
>> AnyRef).array()
>>   }
>>
>> taskBinary = sc.broadcast(taskBinaryBytes)
>>
>>
>> Could somebody give me pointers as to what all is involved if we want to
>> change it to KryoSerializer ?
>>
>>
>>
>> One suggestion here
>>
>>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html
>>
>>  was to use chill-scala ' s KryoSerializer
>> for closureSerializer :
>>
>> private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
>>
>>
>>
>> But on digging the code it looks like KryoSerializer being used is from
>> twitter chill library only.
>>
>> in KryoSerializer.scala :
>>
>> val instantiator = new EmptyScalaKryoInstantiator
>> val kryo = instantiator.newKryo()
>>
>> 
>>
>> package com.twitter.chill
>> class EmptyScalaKryoInstantiator() extends 
>> com.twitter.chill.KryoInstantiator {
>>   override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code 
>> */ }
>> }
>>
>>
>>
>> I am working on a low latency job and much of the time is spent in
>> serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
>> Thoughts ? Is this reasonable ? Wanted to check if shifting to
>> kryoserializer helps here.
>>
>> I am serializing a UnionRDD which is created by code like this :
>>
>>
>> rdds here is a list of schemaRDDs
>>
>>
>> val condition = 'column === indexValue
>>
>> val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields
>>
>> val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))
>>
>>
>>
>> val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))
>>
>>
>> val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)
>>
>>
>> My partitioner above selects one partition (from 100 partitions) per RDD
>> from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
>> 127 partitions
>>
>> Calling unioned.collect leads to serialization of UnionRDD.
>>
>> I am using spark 1.2.1
>>
>>
>> Any help regarding this will be highly appreciated.
>>
>>
>> Best
>> Yash Datta
>>
>>
>> --
>> When events unfold with calm and ease
>> When the winds that blow are merely breeze
>> Learn from nature, from birds and bees
>> Live your life in love, and let joy not cease.
>>
>
>


KryoSerializer for closureSerializer in DAGScheduler

2015-08-31 Thread yash datta
Hi devs,

Curently the only supported serializer for serializing tasks in
DAGScheduler.scala is JavaSerializer.


val taskBinaryBytes: Array[Byte] = stage match {
  case stage: ShuffleMapStage =>
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
  case stage: ResultStage =>
closureSerializer.serialize((stage.rdd,
stage.resultOfJob.get.func) : AnyRef).array()
  }

taskBinary = sc.broadcast(taskBinaryBytes)


Could somebody give me pointers as to what all is involved if we want to
change it to KryoSerializer ?



One suggestion here

http://apache-spark-developers-list.1001551.n3.nabble.com/bug-using-kryo-as-closure-serializer-td6473.html

 was to use chill-scala ' s KryoSerializer
for closureSerializer :

private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()



But on digging the code it looks like KryoSerializer being used is from
twitter chill library only.

in KryoSerializer.scala :

val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()



package com.twitter.chill
class EmptyScalaKryoInstantiator() extends com.twitter.chill.KryoInstantiator {
  override def newKryo() : com.twitter.chill.KryoBase = { /* compiled code */ }
}



I am working on a low latency job and much of the time is spent in
serializing result stage rdd (~140 ms ) and the serialized size is 2.8 mb.
Thoughts ? Is this reasonable ? Wanted to check if shifting to
kryoserializer helps here.

I am serializing a UnionRDD which is created by code like this :


rdds here is a list of schemaRDDs


val condition = 'column === indexValue

val selectFields = UnresolvedAttribute("ts") :: fieldClass.selectFields

val sddpp = rdds.par.map(x => x.where(condition).select(selectFields: _*))



val rddpp = sddpp.map(x => new PartitionPruningRDD(x, partitioner.func))


val unioned = new UnionRDD(sqlContext.sparkContext, rddpp.toList)


My partitioner above selects one partition (from 100 partitions) per RDD
from the list of RDDs passed to UnionRDD, and UnionRDD finally created has
127 partitions

Calling unioned.collect leads to serialization of UnionRDD.

I am using spark 1.2.1


Any help regarding this will be highly appreciated.


Best
Yash Datta


-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


Re: creating hive packages for spark

2015-04-27 Thread yash datta
Hi,

you can build spark-project hive from here :

https://github.com/pwendell/hive/tree/0.13.1-shaded-protobuf

Hope this helps.


On Mon, Apr 27, 2015 at 3:23 PM, Manku Timma manku.tim...@gmail.com wrote:

 Hello Spark developers,
 I want to understand the procedure to create the org.spark-project.hive
 jars. Is this documented somewhere? I am having issues with -Phive-provided
 with my private hive13 jars and want to check if using spark's procedure
 helps.




-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.


Re: Stackoverflow in createDataFrame.

2015-04-24 Thread yash datta
This is already reported :

https://issues.apache.org/jira/browse/SPARK-6999
On 24 Apr 2015 18:11, Jan-Paul Bultmann janpaulbultm...@me.com wrote:

 Hey,
 I get a stack overflow when calling the following method on SQLContext.

 def createDataFrame(rowRDD: JavaRDD[Row], columns:
 java.util.List[String]): DataFrame = {
 createDataFrame(rowRDD.rdd, columns.toSeq)
   } 
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L441
 

 The function will just call itself over and over.

 If you consider this a bug as well I'll gladly open an JIRA issue :).

 Cheers Jan


Re: Building spark 1.2 from source requires more dependencies

2015-03-30 Thread yash datta
Hi all,


When selecting large data in sparksql (Select * query) , I see Buffer
overflow exception from kryo :


15/03/27 10:32:19 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 3.0
(TID 30, machine159): com.esotericsoftware.kryo.KryoException: Buffer
overflow. Available: 1, required: 2
Serialization trace:
values (org.apache.spark.sql.catalyst.expressions.GenericRow)
at com.esotericsoftware.kryo.io.Output.require(Output.java:138)
at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:247)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:95)
at
com.esotericsoftware.kryo.serializers.DefaultSerializers$IntSerializer.write(DefaultSerializers.java:89)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:167)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:210)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.lang.Thread.run(Unknown Source)



I thought maybe increasing these would resolve the problem, but the same
exception is seen :

set spark.kryoserializer.buffer.mb=4;
set spark.kryoserializer.buffer.max.mb=1024;


I have a parquet table with 5 Int columns , 100 million rows.

Can somebody guide why this exception is seen, am I missing some
configuration ?

Thanks
Yash


On Mon, Mar 30, 2015 at 3:05 AM, Sean Owen so...@cloudera.com wrote:

 Given that's it's an internal error from scalac, I think it may be
 something to take up with the Scala folks to really fix. We can just
 look for workarounds. Try blowing away your .m2 and .ivy cache for
 example. FWIW I was running on Linux with Java 8u31, latest scala 2.11
 AFAIK.

 On Sun, Mar 29, 2015 at 10:29 PM, Pala M Muthaia
 mchett...@rocketfuelinc.com wrote:
  Sean,
 
  I did a mvn clean and then build, it produces the same error. I also did
 a
  fresh git clone of spark and invoked the same build command and it
 resulted
  in identical error (I also had a colleague do a same thing, lest there
 was
  some machine specific issue, and saw the same error). Unless i
 misunderstood
  something, it doesn't look like clean build fixes this.
 
  On Fri, Mar 27, 2015 at 10:20 PM, Sean Owen so...@cloudera.com wrote:
 
  This is not a compile error, but an error from the scalac compiler.
  That is, the code and build are fine, but scalac is not compiling it.
  Usually when this happens, a clean build fixes it.
 
  On Fri, Mar 27, 2015 at 7:09 PM, Pala M Muthaia
  mchett...@rocketfuelinc.com wrote:
   No, i am running from the root directory, parent of core.
  
   Here is the first set of errors that i see when i compile from source
   (sorry
   the error message is very long, but adding it in case it helps in
   diagnosis). After i manually add javax.servlet dependency for  version
   3.0,
   these set of errors go away and i get the next set of errors about
   missing
   classes under eclipse-jetty.
  
   I am on maven 3.2.5 and java 1.7.
  
   Error:
  
   [INFO] --- scala-maven-plugin:3.2.0:compile (scala-compile-first) @
   spark-core_2.10 ---
   [WARNING] Zinc server is not available at port 3030 - reverting to
   normal
   incremental compile
   [INFO] Using incremental compilation
   [INFO] compiler plugin:
   BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null)
   [INFO] Compiling 403 Scala sources and 33 Java sources to
   /Users/mchettiar/code/spark/core/target/scala-2.10/classes...
   [WARNING] Class javax.servlet.ServletException not found - continuing
   with a
   stub.
   [ERROR]
while compiling:
  
  
 /Users/mchettiar/code/spark/core/src/main/scala/org/apache/spark/HttpServer.scala
   during phase: typer
library version: version 2.10.4
   compiler version: version 2.10.4
 reconstructed args: -deprecation 

Re: Spark SQL, Hive Parquet data types

2015-02-20 Thread yash datta
For the old parquet path (available in 1.2.1) , i made a few changes for
being able to read/write to a table partitioned on timestamp type column

https://github.com/apache/spark/pull/4469


On Fri, Feb 20, 2015 at 8:28 PM, The Watcher watche...@gmail.com wrote:

 
 
 1. In Spark 1.3.0, timestamp support was added, also Spark SQL uses
 its own Parquet support to handle both read path and write path when
 dealing with Parquet tables declared in Hive metastore, as long as
 you’re
 not writing to a partitioned table. So yes, you can.
 
  Ah, I had missed the part about being partitioned or not. Is this related
 to the work being done on ParquetRelation2 ?

 We will indeed write to a partitioned table : do neither the read nor the
 write path go through Spark SQL's parquet support in that case ? Is there a
 JIRA/PR I can monitor to see when this would change ?

 Thanks




-- 
When events unfold with calm and ease
When the winds that blow are merely breeze
Learn from nature, from birds and bees
Live your life in love, and let joy not cease.