Re: reading each JSON file from dataframe...

2022-07-12 Thread Muthu Jayakumar
Hello Ayan,

Thank you for the suggestion. But, I would lose correlation of the JSON
file with the other identifier fields. Also, if there are too many files,
will it be an issue? Plus, I may not have the same schema across all the
files.

Hello Enrico,

>how does RDD's mapPartitions make a difference regarding
I guess, in the question above I do have to process row-wise and RDD may be
more efficient?

Thanks,
Muthu

On Tue, 12 Jul 2022 at 14:55, ayan guha  wrote:

> Another option is:
>
> 1. collect the dataframe with file path
> 2. create a list of paths
> 3. create a new dataframe with spark.read.json and pass the list of path
>
> This will save you lots of headache
>
> Ayan
>
>
> On Wed, Jul 13, 2022 at 7:35 AM Enrico Minack 
> wrote:
>
>> Hi,
>>
>> how does RDD's mapPartitions make a difference regarding 1. and 2.
>> compared to Dataset's mapPartitions / map function?
>>
>> Enrico
>>
>>
>> Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
>>
>> Hello Enrico,
>>
>> Thanks for the reply. I found that I would have to use `mapPartitions`
>> API of RDD to perform this safely as I have to
>> 1. Read each file from GCS using HDFS FileSystem API.
>> 2. Parse each JSON record in a safe manner.
>>
>> For (1) to work, I do have to broadcast HadoopConfiguration from
>> sparkContext. I did try to use GCS Java API to read content, but ran into
>> many JAR conflicts as the HDFS wrapper and the JAR library uses different
>> dependencies.
>> Hope this findings helps others as well.
>>
>> Thanks,
>> Muthu
>>
>>
>> On Mon, 11 Jul 2022 at 14:11, Enrico Minack 
>> wrote:
>>
>>> All you need to do is implement a method readJson that reads a single
>>> file given its path. Than, you map the values of column file_path to
>>> the respective JSON content as a string. This can be done via an UDF or
>>> simply Dataset.map:
>>>
>>> case class RowWithJsonUri(entity_id: String, file_path: String,
>>> other_useful_id: String)
>>> case class RowWithJsonContent(entity_id: String, json_content: String,
>>> other_useful_id: String)
>>>
>>> val ds = Seq(
>>>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
>>> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
>>> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>>>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
>>> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
>>> "id-2-01g4he5cbh52che104rwy603sr"),
>>>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
>>> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
>>> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>>>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
>>> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
>>> "id-2-01g4he5cbx1kwhgvdme1s560dw")
>>> ).toDS()
>>>
>>> ds.show(false)
>>>
>>> +-+---+---+
>>> |entity_id
>>> |file_path
>>> |other_useful_id|
>>>
>>> +-+---+---+
>>>
>>> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>>>
>>> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>>>
>>> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>>>
>>> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>>>
>>> +-+---+---+
>>>
>>>
>>> def readJson(uri: String): String = { s"content of $uri" }
>>>
>>> ds.map { row => RowWithJsonContent(row.entity_id,
>>> readJson(row.file_path), row.other_useful_id) }.show(false)
>>>
>>> +-+--+---+
>>

Re: reading each JSON file from dataframe...

2022-07-12 Thread Muthu Jayakumar
Hello Enrico,

Thanks for the reply. I found that I would have to use `mapPartitions` API
of RDD to perform this safely as I have to
1. Read each file from GCS using HDFS FileSystem API.
2. Parse each JSON record in a safe manner.

For (1) to work, I do have to broadcast HadoopConfiguration from
sparkContext. I did try to use GCS Java API to read content, but ran into
many JAR conflicts as the HDFS wrapper and the JAR library uses different
dependencies.
Hope this findings helps others as well.

Thanks,
Muthu


On Mon, 11 Jul 2022 at 14:11, Enrico Minack  wrote:

> All you need to do is implement a method readJson that reads a single
> file given its path. Than, you map the values of column file_path to the
> respective JSON content as a string. This can be done via an UDF or simply
> Dataset.map:
>
> case class RowWithJsonUri(entity_id: String, file_path: String,
> other_useful_id: String)
> case class RowWithJsonContent(entity_id: String, json_content: String,
> other_useful_id: String)
>
> val ds = Seq(
>   RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
> "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
> "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
>   RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
> "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
> "id-2-01g4he5cbh52che104rwy603sr"),
>   RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
> "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
> "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
>   RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
> "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
> "id-2-01g4he5cbx1kwhgvdme1s560dw")
> ).toDS()
>
> ds.show(false)
>
> +-+---+---+
> |entity_id
> |file_path
> |other_useful_id|
>
> +-+---+---+
>
> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
>
> |id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
>
> |id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
>
> |id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>
> +-+---+---+
>
>
> def readJson(uri: String): String = { s"content of $uri" }
>
> ds.map { row => RowWithJsonContent(row.entity_id, readJson(row.file_path),
> row.other_useful_id) }.show(false)
>
> +-+--+---+
> |entity_id
> |json_content
> |other_useful_id|
>
> +-+--+---+
> |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
> gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
> |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
> gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
> |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
> gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
> |id-01f7pqqbynh895ptpjjfxvk6dc|content of
> gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
>
> +-+--+---+
>
> Cheers,
> Enrico
>
>
>
>
> Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
>
> Hello there,
>
> I have a dataframe with the following...
>
>
> +-+---+---+
> |entity_id|file_path
>|other_useful_id|
>
> +-+---+---+
>
> |id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn

reading each JSON file from dataframe...

2022-07-10 Thread Muthu Jayakumar
Hello there,

I have a dataframe with the following...

+-+---+---+
|entity_id|file_path
   |other_useful_id|
+-+---+---+
|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
+-+---+---+

I would like to read each row from `file_path` and write the result to
another dataframe containing `entity_id`, `other_useful_id`,
`json_content`, `file_path`.
Assume that I already have the required HDFS url libraries in my classpath.

Please advice,
Muthu


Re: [spark-core] docker-image-tool.sh question...

2021-03-10 Thread Muthu Jayakumar
Hello Attila,

Thank you for verifying this for me. I was looking at
Step 1/18 : ARG java_image_tag=11-jre-slim
and presumed that the docker image is built using JRE 11.

I can confirm that,
(1) $ docker image history 3ef86250a35b
IMAGE  CREATED  CREATED BY
 SIZE  COMMENT
3ef86250a35b   About a minute ago   /bin/sh -c #(nop)  USER 185
0B
da69b1ceffae   About a minute ago   /bin/sh -c #(nop)  ENTRYPOINT
["/opt/entrypo…   0B
64a6705c5b0a   About a minute ago   |1 spark_uid=185 /bin/sh -c chmod a+x
/opt/d…   1.28kB
31998314a897   About a minute ago   |1 spark_uid=185 /bin/sh -c chmod g+w
/opt/s…   0B
80e5a5efb145   About a minute ago   /bin/sh -c #(nop) WORKDIR
/opt/spark/work-dir   0B
0679f96869ca   About a minute ago   /bin/sh -c #(nop)  ENV
SPARK_HOME=/opt/spark0B
924e4145db72   About a minute ago   /bin/sh -c #(nop) COPY
dir:bbafc09edfdb15f8a…   969kB
2d0c1f6457e1   About a minute ago   /bin/sh -c #(nop) COPY
dir:5bfd4e5c910d2d60b…   11.3kB
22faf9d2953c   About a minute ago   /bin/sh -c #(nop) COPY
dir:fe08a455f93331449…   3.05MB
86105d076b70   About a minute ago   /bin/sh -c #(nop) COPY
file:35ddca176fca5de0…   1.28kB
79328c9a5f9f   About a minute ago   /bin/sh -c #(nop) COPY
file:effc73fa9a595c48…   3.49kB
6691778d46d9   About a minute ago   /bin/sh -c #(nop) COPY
dir:95444b482efa0f5f8…   45.1kB
6e9b0eb8dd36   About a minute ago   /bin/sh -c #(nop) COPY
dir:a67a7095c35356a11…   53.8kB
a9d17543e0cb   About a minute ago   /bin/sh -c #(nop) COPY
dir:087058b48f1aced38…   447MB
12ecbd1bb3bc   2 minutes ago|1 spark_uid=185 /bin/sh -c set -ex &&
s…   79.1MB
1f4a72f317c2   2 months ago /bin/sh -c #(nop)  ARG spark_uid=185
 0B
cd08b38dfcae   2 months ago /bin/sh -c set -eux;   arch="$(dpkg
--print-…   109MB
  2 months ago /bin/sh -c #(nop)  ENV
JAVA_VERSION=8u275   0B
  2 months ago /bin/sh -c { echo '#/bin/sh'; echo
'echo "$J…   27B
  2 months ago /bin/sh -c #(nop)  ENV
PATH=/usr/local/openj…   0B
  2 months ago /bin/sh -c #(nop)  ENV
JAVA_HOME=/usr/local/…   0B
  2 months ago /bin/sh -c #(nop)  ENV LANG=C.UTF-8
0B
  2 months ago /bin/sh -c set -eux;  apt-get update;
 apt-g…   8.78MB
  3 months ago /bin/sh -c #(nop)  CMD ["bash"]
0B
  3 months ago /bin/sh -c #(nop) ADD
file:3a7bff4e139bcacc5…   69.2MB

(2) $  docker run --entrypoint "/usr/local/openjdk-8/bin/java" 3ef86250a35b
'-version'
openjdk version "1.8.0_275"
OpenJDK Runtime Environment (build 1.8.0_275-b01)
OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)

Thank you much!

Thanks,
Muthu

On Wed, Mar 10, 2021 at 11:11 PM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

> Hi Muthu!
>
> I tried and at my side it is working just fine:
>
> $  ./bin/docker-image-tool.sh -r docker.io/sample-spark -b
> java_image_tag=8-jre-slim -t 3.1.1 build
> Sending build context to Docker daemon  228.3MB
> Step 1/18 : ARG java_image_tag=11-jre-slim
> Step 2/18 : FROM openjdk:${java_image_tag}
> *8-jre-slim*: Pulling from library/openjdk
> 45b42c59be33: Pull complete
> c3f1fbf102b7: Pull complete
> 262868e4544c: Pull complete
> 1c0fec43ba3f: Pull complete
> Digest:
> sha256:412c52d88d77ea078c50ed4cf8d8656d6448b1c92829128e1c6aab6687ce0998
> *Status: Downloaded newer image for openjdk:8-jre-slim*
>  ---> 8f867fdbd02f
>
> What you see at your side?
>
> Best regards,
> Attila
>
> On Wed, Mar 10, 2021 at 5:44 AM Muthu Jayakumar 
> wrote:
>
>> Hello there,
>>
>> While using docker-image-tool (for Spark 3.1.1) it seems to not accept
>> `java_image_tag` property. The docker image default to JRE 11. Here is what
>> I am running from the command line.
>>
>> $ spark/bin/docker-image-tool.sh -r docker.io/sample-spark -b
>> java_image_tag=8-jre-slim -t 3.1.1 build
>>
>> Please advice,
>> Muthu
>>
>


[spark-core] docker-image-tool.sh question...

2021-03-09 Thread Muthu Jayakumar
Hello there,

While using docker-image-tool (for Spark 3.1.1) it seems to not accept
`java_image_tag` property. The docker image default to JRE 11. Here is what
I am running from the command line.

$ spark/bin/docker-image-tool.sh -r docker.io/sample-spark -b
java_image_tag=8-jre-slim -t 3.1.1 build

Please advice,
Muthu


Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Muthu Jayakumar
Hello Mich,

Thank you for the mail. From, what I can understand from json4s history,
spark and the versions you have...
1. Apache Spark 2.4.3 uses json4s 3.5.3 (to be specific it uses
json4s-jackson)
2. json4s 3.2.11 and 3.2.10 is not compatible (ref:
https://github.com/json4s/json4s/issues/212)
3. I notice that you are using scala 2.10 and scala 2.11 versions on jars.
I believe spark 2.4.3 supports scala 2.11 or 2.12 only.

I would suggest using json4s-jackson, json4s and json4s-native be in
version 3.5.3 (for scala 2.11 or 2.12 depending on your spark version). In
case, if you want to use older version, make sure all of them are older
than 3.2.11 at the least.

Hope it helps.

Thanks,
Muthu


On Mon, Feb 17, 2020 at 1:15 PM Mich Talebzadeh 
wrote:

> Thanks Muthu,
>
>
> I am using the following jar files for now in local mode i.e.  
> spark-shell_local
> --jars …..
>
> json4s-jackson_2.10-3.2.10.jar
> json4s_2.11-3.2.11.jar
> json4s-native_2.10-3.4.0.jar
>
> Which one is the incorrect one please/
>
> Regards,
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 17 Feb 2020 at 20:28, Muthu Jayakumar  wrote:
>
>> I suspect the spark job is somehow having an incorrect (newer) version of
>> json4s in the classpath. json4s 3.5.3 is the utmost version that can be
>> used.
>>
>> Thanks,
>> Muthu
>>
>> On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> Spark version 2.4.3
>>> Hbase 1.2.7
>>>
>>> Data is stored in Hbase as Json. example of a row shown below
>>> [image: image.png]
>>> I am trying to read this table in Spark Scala
>>>
>>> import org.apache.spark.sql.{SQLContext, _}
>>> import org.apache.spark.sql.execution.datasources.hbase._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> import spark.sqlContext.implicits._
>>> import org.json4s._
>>> import org.json4s.jackson.JsonMethods._
>>> import org.json4s.jackson.Serialization.{read => JsonRead}
>>> import org.json4s.jackson.Serialization.{read, write}
>>> def catalog = s"""{
>>>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>>>  | "rowkey":"key",
>>>  | "columns":{
>>>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>>>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker",
>>> "type":"string"},
>>>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
>>> "type":"string"},
>>>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>>>  | |}
>>>  | |}""".stripMargin
>>> def withCatalog(cat: String): DataFrame = {
>>>spark.sqlContext
>>>.read
>>>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>>>.format("org.apache.spark.sql.execution.datasources.hbase")
>>>.load()
>>> }
>>> val df = withCatalog(catalog)
>>>
>>>
>>> However, I am getting this error
>>>
>>> Spark session available as 'spark'.
>>> java.lang.NoSuchMethodError:
>>> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>>>   at
>>> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>>>   at
>>> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>>>   at
>>> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>>>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>>>   at withCatalog(testme.scala:49)
>>>   ... 65 elided
>>>
>>> I have Googled it but with little luck!
>>>
>>> Thanks,
>>> Mich
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


Re: Spark reading from Hbase throws java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods

2020-02-17 Thread Muthu Jayakumar
I suspect the spark job is somehow having an incorrect (newer) version of
json4s in the classpath. json4s 3.5.3 is the utmost version that can be
used.

Thanks,
Muthu

On Mon, Feb 17, 2020, 06:43 Mich Talebzadeh 
wrote:

> Hi,
>
> Spark version 2.4.3
> Hbase 1.2.7
>
> Data is stored in Hbase as Json. example of a row shown below
> [image: image.png]
> I am trying to read this table in Spark Scala
>
> import org.apache.spark.sql.{SQLContext, _}
> import org.apache.spark.sql.execution.datasources.hbase._
> import org.apache.spark.{SparkConf, SparkContext}
> import spark.sqlContext.implicits._
> import org.json4s._
> import org.json4s.jackson.JsonMethods._
> import org.json4s.jackson.Serialization.{read => JsonRead}
> import org.json4s.jackson.Serialization.{read, write}
> def catalog = s"""{
>  | "table":{"namespace":"trading", "name":"MARKETDATAHBASEBATCH",
>  | "rowkey":"key",
>  | "columns":{
>  | "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
>  | |"ticker":{"cf":"PRICE_INFO", "col":"ticker", "type":"string"},
>  | |"timeissued":{"cf":"PRICE_INFO", "col":"timeissued",
> "type":"string"},
>  | |"price":{"cf":"PRICE_INFO", "col":"price", "type":"string"}
>  | |}
>  | |}""".stripMargin
> def withCatalog(cat: String): DataFrame = {
>spark.sqlContext
>.read
>.options(Map(HBaseTableCatalog.tableCatalog->cat))
>.format("org.apache.spark.sql.execution.datasources.hbase")
>.load()
> }
> val df = withCatalog(catalog)
>
>
> However, I am getting this error
>
> Spark session available as 'spark'.
> java.lang.NoSuchMethodError:
> org.json4s.jackson.JsonMethods$.parse(Lorg/json4s/JsonInput;Z)Lorg/json4s/JsonAST$JValue;
>   at
> org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:257)
>   at
> org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:80)
>   at
> org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:51)
>   at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
>   at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
>   at withCatalog(testme.scala:49)
>   ... 65 elided
>
> I have Googled it but with little luck!
>
> Thanks,
> Mich
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Using Percentile in Spark SQL

2019-11-11 Thread Muthu Jayakumar
If you would require higher precision, you may have to write a custom udaf.
In my case, I ended up storing the data as a key-value ordered list of
histograms.

Thanks
Muthu

On Mon, Nov 11, 2019, 20:46 Patrick McCarthy
 wrote:

> Depending on your tolerance for error you could also use
> percentile_approx().
>
> On Mon, Nov 11, 2019 at 10:14 AM Jerry Vinokurov 
> wrote:
>
>> Do you mean that you are trying to compute the percent rank of some data?
>> You can use the SparkSQL percent_rank function for that, but I don't think
>> that's going to give you any improvement over calling the percentRank
>> function on the data frame. Are you currently using a user-defined function
>> for this task? Because I bet that's what's slowing you down.
>>
>> On Mon, Nov 11, 2019 at 9:46 AM Tzahi File 
>> wrote:
>>
>>> Hi,
>>>
>>> Currently, I'm using hive huge cluster(m5.24xl * 40 workers) to run a
>>> percentile function. I'm trying to improve this job by moving it to run
>>> with spark SQL.
>>>
>>> Any suggestions on how to use a percentile function in Spark?
>>>
>>>
>>> Thanks,
>>> --
>>> Tzahi File
>>> Data Engineer
>>> [image: ironSource] 
>>>
>>> email tzahi.f...@ironsrc.com
>>> mobile +972-546864835
>>> fax +972-77-5448273
>>> ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv
>>> ironsrc.com 
>>> [image: linkedin] [image:
>>> twitter] [image: facebook]
>>> [image: googleplus]
>>> 
>>> This email (including any attachments) is for the sole use of the
>>> intended recipient and may contain confidential information which may be
>>> protected by legal privilege. If you are not the intended recipient, or the
>>> employee or agent responsible for delivering it to the intended recipient,
>>> you are hereby notified that any use, dissemination, distribution or
>>> copying of this communication and/or its content is strictly prohibited. If
>>> you are not the intended recipient, please immediately notify us by reply
>>> email or by telephone, delete this email and destroy any copies. Thank you.
>>>
>>
>>
>> --
>> http://www.google.com/profiles/grapesmoker
>>
>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


Re: Core allocation is scattered

2019-07-31 Thread Muthu Jayakumar
>I am running a spark job with 20 cores but i did not understand why my
application get 1-2 cores on couple of machines why not it just run on two
nodes like node1=16 cores and node 2=4 cores . but cores are allocated like
node1=2 node =1-node 14=1 like that.

I believe that's the intended behavior for spark. Please refer to
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
section on 'spark.deploy.spreadOut' mode.If I understand correctly, you may
want " spark.deploy.spreadOut  false".

Hope it helps!

Happy Spark(ing).

On Thu, Jul 25, 2019 at 7:22 PM Srikanth Sriram <
sriramsrikanth1...@gmail.com> wrote:

> Hello,
>
> Below is my understanding.
>
> The default configuration parameters which will be considered by the spark
> job if these are not configured at the time of submitting job to the
> required values.
>
> # - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
> # - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
> # - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
>
> SPARK_EXECUTOR_INSTANCES -> indicates the number of workers to be started,
> it means for a job maximum this many number of executors it can ask/take
> from the cluster resource manager.
>
> SPARK_EXECUTOR_CORES -> indicates the number of cores in each executor, it
> means the spark TaskScheduler will ask this many cores to be
> allocated/blocked in each of the executor machine.
>
> SPARK_EXECUTOR_MEMORY -> indicates the maximum amount of RAM/MEMORY it
> requires in each executor.
>
> All these details are asked by the TastScheduler to the cluster manager
> (it may be a spark standalone, yarn, mesos and can be kubernetes supported
> starting from spark 2.0) to provide before actually the job execution
> starts.
>
> Also, please note that, initial number of executor instances is dependent
> on "--num-executors" but when the data is more to be processed and
> "spark.dynamicAllocation.enabled" set true, then it will be dynamically add
> more executors based on "spark.dynamicAllocation.initialExecutors".
>
> Note: Always "spark.dynamicAllocation.initialExecutors" should be
> configured greater than "--num-executors".
> spark.dynamicAllocation.initialExecutors
> spark.dynamicAllocation.minExecutors Initial number of executors to run
> if dynamic allocation is enabled.
>
> If `--num-executors` (or `spark.executor.instances`) is set and larger
> than this value, it will be used as the initial number of executors.
> spark.executor.memory 1g Amount of memory to use per executor process, in
> the same format as JVM memory strings with a size unit suffix ("k", "m",
> "g" or "t") (e.g. 512m, 2g).
> spark.executor.cores 1 in YARN mode, all the available cores on the
> worker in standalone and Mesos coarse-grained modes. The number of cores
> to use on each executor. In standalone and Mesos coarse-grained modes, for
> more detail, see this description
> 
> .
>
> On Thu, Jul 25, 2019 at 5:54 PM Amit Sharma  wrote:
>
>> I have cluster with 26 nodes having 16 cores on each. I am running a
>> spark job with 20 cores but i did not understand why my application get 1-2
>> cores on couple of machines why not it just run on two nodes like node1=16
>> cores and node 2=4 cores . but cores are allocated like node1=2 node
>> =1-node 14=1 like that. Is there any conf property i need to
>> change. I know with dynamic allocation we can use below but without dynamic
>> allocation is there any?
>> --conf "spark.dynamicAllocation.maxExecutors=2"
>>
>>
>> Thanks
>> Amit
>>
>
>
> --
> Regards,
> Srikanth Sriram
>


Number of tasks...

2019-07-29 Thread Muthu Jayakumar
Hello there,

I have a basic question with how the number of tasks are determined per
spark job.
Let's say the scope of this discussion around parquet and Spark 2.x.
1. I thought that, the number of jobs is proportional to the number of part
files that exist. Is this correct?
2. I noticed that for a 25 core job, the number of tasks scheduled was
around 250. But the same job, when executed with 75 cores had around 460
tasks. Are the number of tasks proportional to cores used?
Note, the number of tasks, I refer to here are the tasks count during `
spark.read.parquet("")` operation. I do understand that, during join /
reduce operation, the shuffle takes control of the number of tasks for the
next stage (from "spark.sql.shuffle.partitions" which defaults to 200 --
https://spark.apache.org/docs/latest/sql-performance-tuning.html).
3. Also for "spark.sql.shuffle.partitions" is there anyway I can provide a
computed value based on input data / join / UDAF functions used? Ideally if
the tasks are around 200, I might see OutOfMemory issue (depending on the
data size). Too large of a number would create many small tasks. The right
balance may be based on input data size + shuffle operation.

Please advice
Muthu


Re: Re: Can an UDF return a custom class other than case class?

2019-01-07 Thread Muthu Jayakumar
Perhaps use of generic StructType may work in your situation of being
language agnostic? case-classes are backed by implicits to provide type
conversions into columnar.
My 2 cents.

Thanks,
Mutu


On Mon, Jan 7, 2019 at 4:13 AM yeikel valdes  wrote:

>
>
>  Forwarded Message 
> From : em...@yeikel.com
> To : kfehl...@gmail.com
> Date : Mon, 07 Jan 2019 04:11:22 -0800
> Subject : Re: Can an UDF return a custom class other than case class?
>
>
> In this case I am just curious because I'd like to know if it is possible.
>
> At the same time I will be interacting with external Java class files if
> that's allowed.
>
> Also, what are the equivalents for other languages like Java? I am not
> aware of anything similar to the case class in Java.
>
> I am currently using Scala but I might use PySpark or the Java apis in the
> future.
>
> Thank you
>
>  On Sun, 06 Jan 2019 22:06:28 -0800 * kfehl...@gmail.com
>  * wrote 
>
> Is there a reason why case classes won't work for your use case?
>
> On Sun, Jan 6, 2019 at 10:43 PM  wrote:
>
> Hi ,
>
>
>
> Is it possible to return a custom class from an UDF other than a case
> class?
>
>
>
> If so , how can we avoid this exception ? :
> java.lang.UnsupportedOperationException: Schema for type {custom type} is
> not supported
>
>
>
> Full Example :
>
>
>
> import spark.implicits._
>
> import org.apache.spark.sql.functions.udf
>
>
>
> class Person (val name : String)
>
>
>
> val toPerson = (s1 : String) => new Person(s1)
>
>
>
> val dataset = Seq("John Smith").toDF("name")
>
>
>
> val personUDF = udf(toPerson)
>
>
>
> java.lang.UnsupportedOperationException: Schema for type Person is not
> supported
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:780)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:715)
>
>   at
> scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
>
>   at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:711)
>
>   at org.apache.spark.sql.functions$.udf(functions.scala:3340)
>
>
>
> dataset.withColumn("person", personUDF($"name"))
>
>
>
>
>
> Thank you.
>
>
>
>


Re: Spark job on dataproc failing with Exception in thread "main" java.lang.NoSuchMethodError: com.googl

2018-12-20 Thread Muthu Jayakumar
The error reads as Precondition.checkArgument() method is on an incorrect
parameter signature.
Could you check to see how many jars (before the Uber jar), actually
contain this method signature?
I smell an issue with jar version conflict or similar.

Thanks
Muthu

On Thu, Dec 20, 2018, 02:40 Mich Talebzadeh 
wrote:

> Anyone in Spark user group seen this error in case?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 20 Dec 2018 at 09:38,  wrote:
>
>> Hi,
>>
>> I am trying a basic Spark job in Scala program. I compile it with SBT
>> with the following dependencies
>>
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.0.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
>> % "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.6.1" % "provided"
>> libraryDependencies += "org.apache.phoenix" % "phoenix-spark" %
>> "4.6.0-HBase-1.0"
>> libraryDependencies += "org.apache.hbase" % "hbase" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
>> libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
>> libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" %
>> "2.2.0"
>> libraryDependencies += "org.mongodb" % "mongo-java-driver" % "3.8.1"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-twitter" %
>> "1.6.3"
>> libraryDependencies += "com.google.cloud.bigdataoss" %
>> "bigquery-connector" % "0.13.4-hadoop3"
>> libraryDependencies += "com.google.cloud.bigdataoss" % "gcs-connector" %
>> "1.9.4-hadoop3"
>> libraryDependencies += "com.google.code.gson" % "gson" % "2.8.5"
>> libraryDependencies += "com.google.guava" % "guava" % "27.0.1-jre"
>> libraryDependencies += "org.apache.httpcomponents" % "httpcore" % "4.4.8"
>>
>> It compiles fine and creates the Uber jar file. But when I run I get the
>> following error.
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
>> at
>> com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
>> at
>> com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260)
>> at simple$.main(simple.scala:150)
>> at simple.main(simple.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Sounds like there is incompatibility in GUAVA versions between compiles
>> and run? These are the versions thar are used:
>>
>>
>>- Java openjdk version "1.8.0_181"
>>- Spark version 2.3.2
>>- Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
>>
>>
>> Appreciate any feedback.
>>
>> Thanks,
>>
>> Mich
>>
>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "Google Cloud Dataproc Discussions" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to cloud-dataproc-discuss+unsubscr...@googlegroups.com.
>> To view this discussion on the web visit
>> https://groups.google.com/d/msgid/cloud-dataproc-discuss/12ea0075-c18f-4c46-adbf-958ca24730d1%40googlegroups.com
>> 
>> .
>> For more options, visit https://groups.google.com/d/optout.
>>
>


Re: error in job

2018-10-06 Thread Muthu Jayakumar
The error means that, you are missing commons-configuration-version.jar
from the classpath of the driver/worker.

Thanks,
Muthu

On Sat, Sep 29, 2018 at 11:55 PM yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

> Hi , i am getting this error please help me .
>
>
> 18/09/30 05:14:44 INFO Client:
>  client token: N/A
>  diagnostics: User class threw exception: java.lang.NoClassDefFoundError:
> org/apache/commons/configuration/ConfigurationException
> at
> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala:135)
> at
> org.apache.spark.sql.cassandra.DefaultSource$.(DefaultSource.scala)
> at
> org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:55)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
> at com.ola.ss.lhf.LoginHourFetcher.main(LoginHourFetcher.java:39)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.commons.configuration.ConfigurationException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 13 more
>
>  ApplicationMaster host: 10.14.58.163
>  ApplicationMaster RPC port: 0
>  queue: signals
>  start time: 1538284434296
>  final status: FAILED
>  tracking URL:
> http://as-data3.prod-ambari16.olacabs.net:8088/proxy/application_1537943933870_0088/
>  user: yubrajsingh
> Exception in thread "main" org.apache.spark.SparkException: Application
> application_1537943933870_0088 finished with failed status
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1269)
> at
> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1627)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>


Re: Encoder for JValue

2018-09-19 Thread Muthu Jayakumar
A naive workaround may be to transform the json4s JValue to String (using
something like compact()) and process it as String? Once you are done with
the last action, you could write it back as JValue (using something like
parse())

Thanks,
Muthu

On Wed, Sep 19, 2018 at 6:35 AM Arko Provo Mukherjee <
arkoprovomukher...@gmail.com> wrote:

> Hello Spark Gurus,
>
> I am running into an issue with Encoding and wanted your help.
>
> I have a case class with a JObject in it. Ex:
> *case class SomeClass(a: String, b: JObject)*
>
> I also have an encoder for this case class:
> *val encoder = Encoders.product[**SomeClass**]*
>
> Now I am creating a DataFrame with the tuple (a, b) from my
> transformations and converting into a DataSet:
> *df.as [SomeClass](encoder)*
>
> When I do this, I get the following error:
> *java.lang.UnsupportedOperationException: No Encoder found for
> org.json4s.JsonAST.JValue*
>
> Appreciate any help regarding this issue.
>
> Many thanks in advance!
> Warm regards
> Arko
>
>
>


Re: Parquet

2018-07-20 Thread Muthu Jayakumar
I generally write to Parquet when I want to repeat the operation of reading
data and perform different operations on it every time. This would save db
time for me.

Thanks
Muthu

On Thu, Jul 19, 2018, 18:34 amin mohebbi 
wrote:

> We do have two big tables each includes 5 billion of rows, so my question
> here is should we partition /sort the data and convert it to Parquet before
> doing any join?
>
> Best Regards ... Amin
> Mohebbi PhD candidate in Software Engineering   at university of Malaysia
> Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my
> amin_...@me.com
>


Spark + CDB (Cockroach DB) support...

2018-06-15 Thread Muthu Jayakumar
Hello there,

I am trying to check to see CDB is available for Apache Spark. I could
currently use CDB using Postgres driver. But I would like to check to see
if there are any specialized drivers that I can use which optimizes for
predicate-push-down and other optimizations pertaining to data-locality.

Thanks
Muthu


Re: Does Spark run on Java 10?

2018-04-01 Thread Muthu Jayakumar
It is supported with some limitations on JSR 376 (JPMS) that can cause
linker errors.

Thanks,
Muthu

On Sun, Apr 1, 2018 at 11:15 AM, kant kodali <kanth...@gmail.com> wrote:

> Hi Muthu,
>
> "On a side note, if some coming version of Scala 2.11 becomes full Java
> 9/10 compliant it could work."
>
> From the links, you pointed out. It looks like Scala 2.11.12 is compliant
> with Java 9/10?
>
> Thanks!
>
>
>
> On Sun, Apr 1, 2018 at 7:50 AM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Short answer may be no. Spark runs on Scala 2.11. Even Scala 2.12 is also
>> not fully Java 9 compliant. For more info...
>> http://docs.scala-lang.org/overviews/jdk-compatibility/overview.html ---
>> check the last section.
>> https://issues.apache.org/jira/browse/SPARK-14220
>>
>> On a side note, if some coming version of Scala 2.11 becomes full Java
>> 9/10 compliant it could work.
>>
>> Hope, this helps.
>>
>> Thanks,
>> Muthu
>>
>> On Sun, Apr 1, 2018 at 6:57 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Does anybody got Spark running on Java 10?
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>


Re: Does Spark run on Java 10?

2018-04-01 Thread Muthu Jayakumar
Short answer may be no. Spark runs on Scala 2.11. Even Scala 2.12 is also
not fully Java 9 compliant. For more info...
http://docs.scala-lang.org/overviews/jdk-compatibility/overview.html ---
check the last section.
https://issues.apache.org/jira/browse/SPARK-14220

On a side note, if some coming version of Scala 2.11 becomes full Java 9/10
compliant it could work.

Hope, this helps.

Thanks,
Muthu

On Sun, Apr 1, 2018 at 6:57 AM, kant kodali  wrote:

> Hi All,
>
> Does anybody got Spark running on Java 10?
>
> Thanks!
>
>
>


Re: DataFrame --- join / groupBy-agg question...

2017-07-19 Thread Muthu Jayakumar
The problem with 'spark.sql.shuffle.partitions' is that, it needs to be set
before spark session is create (I guess?). But ideally, I want to partition
by column during a join / group-by (something roughly like
repartitionBy(partitionExpression: Column*) from
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset).
This way I can change the numbers by the data.

Thanks,
Muthu

On Wed, Jul 19, 2017 at 8:23 AM, ayan guha  wrote:

> You can use spark.sql.shuffle.partitions to adjust amount of parallelism.
>
> On Wed, Jul 19, 2017 at 11:41 PM, muthu  wrote:
>
>> Hello there,
>>
>> Thank you for looking into the question.
>>
>> >Is the partition count of df depending on fields of groupby?
>> Absolute partition number or by column value to determine the partition
>> count would be fine for me (which is similar to repartition() I suppose)
>>
>> >Also is the performance of groupby-agg comparable to
>> reducebykey/aggbykey?
>> In theory the DF/ DS APIs are supposed to be better as they would
>> optimize the execution order and so on by building an effective Query Plan.
>>
>> Currently I am hacking to spin up a new spark-submit per query request by
>> setting 'spark.sql.shuffle.partitions'. In ideal situations, we have a
>> long running application that uses the same spark-session and runs one or
>> more query using FAIR mode.
>>
>> Thanks,
>> Muthu
>>
>>
>>
>> On Wed, Jul 19, 2017 at 6:03 AM, qihuagao [via Apache Spark User List] 
>> <[hidden
>> email] > wrote:
>>
>>> also interested in this.
>>> Is the partition count of df depending on fields of groupby?
>>> Also is the performance of groupby-agg comparable to
>>> reducebykey/aggbykey?
>>>
>>> --
>>> If you reply to this email, your message will be added to the discussion
>>> below:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DataFram
>>> e-join-groupBy-agg-question-tp28849p28879.html
>>> To start a new topic under Apache Spark User List, email [hidden email]
>>> 
>>> To unsubscribe from DataFrame --- join / groupBy-agg question..., click
>>> here.
>>> NAML
>>> 
>>>
>>
>>
>> --
>> View this message in context: Re: DataFrame --- join / groupBy-agg
>> question...
>> 
>>
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


DataFrame --- join / groupBy-agg question...

2017-07-11 Thread Muthu Jayakumar
Hello there,

I may be having a naive question on join / groupBy-agg. During the days of
RDD, whenever I wanted to perform
a. groupBy-agg, I used to say reduceByKey (of PairRDDFunctions) with an
optional Partition-Strategy (with is number of partitions or Partitioner)
b. join (of PairRDDFunctions) and its variants, I used to have a way to
provide number of partitions

In DataFrame, how do I specify the number of partitions during this
operation? I could use repartition() after the fact. But this would be
another Stage in the Job.

One work around to increase the number of partitions / task during a join
is to set 'spark.sql.shuffle.partitions' it some desired number during
spark-submit. I am trying to see if there is a way to provide this
programmatically for every step of a groupBy-agg / join.

Please advice,
Muthu


Re: What is the easiest way for an application to Query parquet data on HDFS?

2017-06-05 Thread Muthu Jayakumar
I run a spark-submit(https://spark.apache.org/docs/latest/spark-standalone.
html#launching-spark-applications) in client-mode that starts the
micro-service. If you keep the event loop going then the spark context
would remain active.

Thanks,
Muthu

On Mon, Jun 5, 2017 at 2:44 PM, kant kodali <kanth...@gmail.com> wrote:

> Are you launching SparkSession from a MicroService or through spark-submit
> ?
>
> On Sun, Jun 4, 2017 at 11:52 PM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Hello Kant,
>>
>> >I still don't understand How SparkSession can use Akka to communicate
>> with SparkCluster?
>> Let me use your initial requirement as a way to illustrate what I mean --
>> i.e, "I want my Micro service app to be able to query and access data on
>> HDFS"
>> In order to run a query say a DF query (equally possible with SQL as
>> well), you'll need a sparkSession to build a query right? If you can have
>> your main thread launched in client-mode (https://spark.apache.org/docs
>> /latest/spark-standalone.html#launching-spark-applications) then you'll
>> be able to use play/akka based microservice as you used to.
>> Here is what I have in one of my applications do...
>> a. I have an akka-http as a micro-service that takes a query-like JSON
>> request (based on simple scala parser combinator) and runs a spark job
>> using dataframe/dataset and sends back JSON responses (synchronous and
>> asynchronously).
>> b. have another akka-actor that takes an object request to generate
>> parquet(s)
>> c. Another akka-http endpoint (based on web-sockets) to perform similar
>> operation as (a)
>> d. Another akka-http end-point to get progress on a running query /
>> parquet generation (which is based on SparkContext / SparkSQL internal API
>> which is similar to https://spark.apache.org/docs/latest/monitoring.html)
>> The idea is to make sure to have only one sparkSession per JVM. But you
>> can set the execution to be in FAIR (which defaults to FIFO) to be able to
>> run multiple queries in parallel. The application I use runs spark in Spark
>> Standalone with a 32 node cluster.
>>
>> Hope this gives some better idea.
>>
>> Thanks,
>> Muthu
>>
>>
>> On Sun, Jun 4, 2017 at 10:33 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi Muthu,
>>>
>>> I am actually using Play framework for my Micro service which uses Akka
>>> but I still don't understand How SparkSession can use Akka to communicate
>>> with SparkCluster? SparkPi or SparkPl? any link?
>>>
>>> Thanks!
>>>
>>
>>
>


Re: What is the easiest way for an application to Query parquet data on HDFS?

2017-06-05 Thread Muthu Jayakumar
Hello Kant,

>I still don't understand How SparkSession can use Akka to communicate with
SparkCluster?
Let me use your initial requirement as a way to illustrate what I mean --
i.e, "I want my Micro service app to be able to query and access data on
HDFS"
In order to run a query say a DF query (equally possible with SQL as well),
you'll need a sparkSession to build a query right? If you can have your
main thread launched in client-mode (
https://spark.apache.org/docs/latest/spark-standalone.html#launching-spark-applications)
then you'll be able to use play/akka based microservice as you used to.
Here is what I have in one of my applications do...
a. I have an akka-http as a micro-service that takes a query-like JSON
request (based on simple scala parser combinator) and runs a spark job
using dataframe/dataset and sends back JSON responses (synchronous and
asynchronously).
b. have another akka-actor that takes an object request to generate
parquet(s)
c. Another akka-http endpoint (based on web-sockets) to perform similar
operation as (a)
d. Another akka-http end-point to get progress on a running query / parquet
generation (which is based on SparkContext / SparkSQL internal API which is
similar to https://spark.apache.org/docs/latest/monitoring.html)
The idea is to make sure to have only one sparkSession per JVM. But you can
set the execution to be in FAIR (which defaults to FIFO) to be able to run
multiple queries in parallel. The application I use runs spark in Spark
Standalone with a 32 node cluster.

Hope this gives some better idea.

Thanks,
Muthu


On Sun, Jun 4, 2017 at 10:33 PM, kant kodali  wrote:

> Hi Muthu,
>
> I am actually using Play framework for my Micro service which uses Akka
> but I still don't understand How SparkSession can use Akka to communicate
> with SparkCluster? SparkPi or SparkPl? any link?
>
> Thanks!
>


Re: What is the easiest way for an application to Query parquet data on HDFS?

2017-06-04 Thread Muthu Jayakumar
One drastic suggestion can be to write a simple microservice using Akka and
create a SparkSession (during the start of vm) and pass it around. You can
look at SparkPI for sample source code to start writing your microservice.
In my case, I used akka http to wrap my business requests and transform
them to read Parquet and respond back results.
Hope this helps

Thanks
Muthu


On Mon, Jun 5, 2017, 01:01 Sandeep Nemuri  wrote:

> Well if you are using Hortonworks distribution there is Livy2 which is
> compatible with Spark2 and scala 2.11.
>
>
> https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.0/bk_command-line-installation/content/install_configure_livy2.html
>
>
> On Sun, Jun 4, 2017 at 1:55 PM, kant kodali  wrote:
>
>> Hi,
>>
>> Thanks for this but here is what the documentation says:
>>
>> "To run the Livy server, you will also need an Apache Spark
>> installation. You can get Spark releases at
>> https://spark.apache.org/downloads.html. Livy requires at least Spark
>> 1.4 and currently only supports Scala 2.10 builds of Spark. To run Livy
>> with local sessions, first export these variables:"
>>
>> I am using spark 2.1.1 and scala 2.11.8 and I would like to use
>> Dataframes and Dataset API so it sounds like this is not an option for me?
>>
>> Thanks!
>>
>> On Sun, Jun 4, 2017 at 12:23 AM, Sandeep Nemuri 
>> wrote:
>>
>>> Check out http://livy.io/
>>>
>>>
>>> On Sun, Jun 4, 2017 at 11:59 AM, kant kodali  wrote:
>>>
 Hi All,

 I am wondering what is the easiest way for a Micro service to query
 data on HDFS? By easiest way I mean using minimal number of tools.

 Currently I use spark structured streaming to do some real time
 aggregations and store it in HDFS. But now, I want my Micro service app to
 be able to query and access data on HDFS. It looks like SparkSession can
 only be accessed through CLI but not through a JDBC like API or whatever.
 Any suggestions?

 Thanks!

>>>
>>>
>>>
>>> --
>>> *  Regards*
>>> *  Sandeep Nemuri*
>>>
>>
>>
>
>
> --
> *  Regards*
> *  Sandeep Nemuri*
>


Spark repartition question...

2017-04-30 Thread Muthu Jayakumar
Hello there,

I am trying to understand the difference between the following
reparition()...
a. def repartition(partitionExprs: Column*): Dataset[T]
b. def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T]
c. def repartition(numPartitions: Int): Dataset[T]

My understanding is that (c) is a simpler hash based partitioner where the
number of records are equally partitioned into numPartitions.
(a) is more like (c) except that the nuumPartitions depends on distinct
column values from the expression. right?
(b) Similar to (a) but what does numPartitions mean here?

On a side note, from the source code, it seems like (a) & (b) uses
RepartitionByExpression  . And my guess is that (a) would default the
numPartitions to 200 (which is the default shuffle partition size)

Reason for my question...
say df.reparition(50, col("cat_col"))
and the distinct `cat_col` for the df is about 20 values. The effective
partitions would still be 50? And if it's 50 would the 20 distinct values
would most likely get their own bucket of partition, but some of the values
can repeat into the remainder of the 30 bucket... Is this loosely correct?

The reason for my question is to attempt to fit a large amount of data in
memory that would not fit thru all the workers in the cluster. But if I
repartition the data in some logical manner, then I would be able to fit
the data in the heap to perform some useful joins and write the result back
into parquet (or other useful) datastore

Please advice,
Muthu


Re: Fast write datastore...

2017-03-15 Thread Muthu Jayakumar
>Reading your original question again, it seems to me probably you don't
need a fast data store
Shiva, You are right. I only asked about fast-write and never mentioned on
read :). For us, Cassandra may not be a choice of read because of its
a. limitations on pagination support on the server side
b. richness of filters provided when compared to elastic search... but this
can worked around by using spark dataframe.
c. a possible larger limitation for me, which is mandate on creating a
partition key column before hand. I may not be able to determine this
before hand.
But 'materialized view', 'SSTable Attached Secondary Index (SASI)' can help
alleviate to some extent.

>what performance do you expect from subsequent queries?
Uladzimir, here is what we do now...
Step 1: Run aggregate query using large number of parquets (generally
ranging from few MBs to few GBs) using Spark Dataframe.
Step 2: Attempt to store these query results in a 'fast datastore' (I have
asked for recommendations in this post). The data is usually sized from
250K to 600 million rows... Also the schema from Step 1 is not known before
hand and is usually deduced from the Dataframe schema or so. In most cases
it's a simple non-structural field.
Step 3: Run one or more queries from results stored in Step 2... These are
something as simple as pagination, filters (think of it as simple string
contains, regex, number in range, ...) and sort. For any operation more
complex than this, I have been planning to run it thru a dataframe.

Koert makes valid points on the issues with Elastic Search.

On a side note, we do use Cassandra for Spark Streaming use-cases where we
sink the data into Cassandra (for efficient upsert capabilities) and
eventually write into parquet for long term storage and trend analysis with
full table scans scenarios.

But I am thankful for many ideas and perspectives on how this could be
looked at.

Thanks,
Muthu


On Wed, Mar 15, 2017 at 7:25 PM, Shiva Ramagopal <tr.s...@gmail.com> wrote:

> Hi,
>
> The choice of ES vs Cassandra should really be made depending on your
> query use-cases. ES and Cassandra have their own strengths which should be
> matched to what you want to do rather than making a choice based on their
> respective feature sets.
>
> Reading your original question again, it seems to me probably you don't
> need a fast data store since you are doing a batch-like processing (reading
> from Parquet files) and it is possibly to control this part fully. And it
> also seems like you want to use ES. You can try to reduce the number of
> Spark executors to throttle the writes to ES.
>
> -Shiva
>
> On Wed, Mar 15, 2017 at 11:32 PM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Hello Uladzimir / Shiva,
>>
>> From ElasticSearch documentation (i have to see the logical plan of a
>> query to confirm), the richness of filters (like regex,..) is pretty good
>> while comparing to Cassandra. As for aggregates, i think Spark Dataframes
>> is quite rich enough to tackle.
>> Let me know your thoughts.
>>
>> Thanks,
>> Muthu
>>
>>
>> On Wed, Mar 15, 2017 at 10:55 AM, vvshvv <vvs...@gmail.com> wrote:
>>
>>> Hi muthu,
>>>
>>> I agree with Shiva, Cassandra also supports SASI indexes, which can
>>> partially replace Elasticsearch functionality.
>>>
>>> Regards,
>>> Uladzimir
>>>
>>>
>>>
>>> Sent from my Mi phone
>>> On Shiva Ramagopal <tr.s...@gmail.com>, Mar 15, 2017 5:57 PM wrote:
>>>
>>> Probably Cassandra is a good choice if you are mainly looking for a
>>> datastore that supports fast writes. You can ingest the data into a table
>>> and define one or more materialized views on top of it to support your
>>> queries. Since you mention that your queries are going to be simple you can
>>> define your indexes in the materialized views according to how you want to
>>> query the data.
>>>
>>> Thanks,
>>> Shiva
>>>
>>>
>>>
>>> On Wed, Mar 15, 2017 at 7:58 PM, Muthu Jayakumar <bablo...@gmail.com>
>>> wrote:
>>>
>>>> Hello Vincent,
>>>>
>>>> Cassandra may not fit my bill if I need to define my partition and
>>>> other indexes upfront. Is this right?
>>>>
>>>> Hello Richard,
>>>>
>>>> Let me evaluate Apache Ignite. I did evaluate it 3 months back and back
>>>> then the connector to Apache Spark did not support Spark 2.0.
>>>>
>>>> Another drastic thought may be repartition the result count to 1 (but
>>>> have to be cautions on making sure I don't run into Heap issues i

Re: Fast write datastore...

2017-03-15 Thread Muthu Jayakumar
Hello Uladzimir / Shiva,

>From ElasticSearch documentation (i have to see the logical plan of a query
to confirm), the richness of filters (like regex,..) is pretty good while
comparing to Cassandra. As for aggregates, i think Spark Dataframes is
quite rich enough to tackle.
Let me know your thoughts.

Thanks,
Muthu


On Wed, Mar 15, 2017 at 10:55 AM, vvshvv <vvs...@gmail.com> wrote:

> Hi muthu,
>
> I agree with Shiva, Cassandra also supports SASI indexes, which can
> partially replace Elasticsearch functionality.
>
> Regards,
> Uladzimir
>
>
>
> Sent from my Mi phone
> On Shiva Ramagopal <tr.s...@gmail.com>, Mar 15, 2017 5:57 PM wrote:
>
> Probably Cassandra is a good choice if you are mainly looking for a
> datastore that supports fast writes. You can ingest the data into a table
> and define one or more materialized views on top of it to support your
> queries. Since you mention that your queries are going to be simple you can
> define your indexes in the materialized views according to how you want to
> query the data.
>
> Thanks,
> Shiva
>
>
>
> On Wed, Mar 15, 2017 at 7:58 PM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Hello Vincent,
>>
>> Cassandra may not fit my bill if I need to define my partition and other
>> indexes upfront. Is this right?
>>
>> Hello Richard,
>>
>> Let me evaluate Apache Ignite. I did evaluate it 3 months back and back
>> then the connector to Apache Spark did not support Spark 2.0.
>>
>> Another drastic thought may be repartition the result count to 1 (but
>> have to be cautions on making sure I don't run into Heap issues if the
>> result is too large to fit into an executor)  and write to a relational
>> database like mysql / postgres. But, I believe I can do the same using
>> ElasticSearch too.
>>
>> A slightly over-kill solution may be Spark to Kafka to ElasticSearch?
>>
>> More thoughts welcome please.
>>
>> Thanks,
>> Muthu
>>
>> On Wed, Mar 15, 2017 at 4:53 AM, Richard Siebeling <rsiebel...@gmail.com>
>> wrote:
>>
>>> maybe Apache Ignite does fit your requirements
>>>
>>> On 15 March 2017 at 08:44, vincent gromakowski <
>>> vincent.gromakow...@gmail.com> wrote:
>>>
>>>> Hi
>>>> If queries are statics and filters are on the same columns, Cassandra
>>>> is a good option.
>>>>
>>>> Le 15 mars 2017 7:04 AM, "muthu" <bablo...@gmail.com> a écrit :
>>>>
>>>> Hello there,
>>>>
>>>> I have one or more parquet files to read and perform some aggregate
>>>> queries
>>>> using Spark Dataframe. I would like to find a reasonable fast datastore
>>>> that
>>>> allows me to write the results for subsequent (simpler queries).
>>>> I did attempt to use ElasticSearch to write the query results using
>>>> ElasticSearch Hadoop connector. But I am running into connector write
>>>> issues
>>>> if the number of Spark executors are too many for ElasticSearch to
>>>> handle.
>>>> But in the schema sense, this seems a great fit as ElasticSearch has
>>>> smartz
>>>> in place to discover the schema. Also in the query sense, I can perform
>>>> simple filters and sort using ElasticSearch and for more complex
>>>> aggregate,
>>>> Spark Dataframe can come back to the rescue :).
>>>> Please advice on other possible data-stores I could use?
>>>>
>>>> Thanks,
>>>> Muthu
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/Fast-write-datastore-tp28497.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>>
>>>
>>
>


Re: Fast write datastore...

2017-03-15 Thread Muthu Jayakumar
Hello Vincent,

Cassandra may not fit my bill if I need to define my partition and other
indexes upfront. Is this right?

Hello Richard,

Let me evaluate Apache Ignite. I did evaluate it 3 months back and back
then the connector to Apache Spark did not support Spark 2.0.

Another drastic thought may be repartition the result count to 1 (but have
to be cautions on making sure I don't run into Heap issues if the result is
too large to fit into an executor)  and write to a relational database like
mysql / postgres. But, I believe I can do the same using ElasticSearch too.

A slightly over-kill solution may be Spark to Kafka to ElasticSearch?

More thoughts welcome please.

Thanks,
Muthu

On Wed, Mar 15, 2017 at 4:53 AM, Richard Siebeling 
wrote:

> maybe Apache Ignite does fit your requirements
>
> On 15 March 2017 at 08:44, vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Hi
>> If queries are statics and filters are on the same columns, Cassandra is
>> a good option.
>>
>> Le 15 mars 2017 7:04 AM, "muthu"  a écrit :
>>
>> Hello there,
>>
>> I have one or more parquet files to read and perform some aggregate
>> queries
>> using Spark Dataframe. I would like to find a reasonable fast datastore
>> that
>> allows me to write the results for subsequent (simpler queries).
>> I did attempt to use ElasticSearch to write the query results using
>> ElasticSearch Hadoop connector. But I am running into connector write
>> issues
>> if the number of Spark executors are too many for ElasticSearch to handle.
>> But in the schema sense, this seems a great fit as ElasticSearch has
>> smartz
>> in place to discover the schema. Also in the query sense, I can perform
>> simple filters and sort using ElasticSearch and for more complex
>> aggregate,
>> Spark Dataframe can come back to the rescue :).
>> Please advice on other possible data-stores I could use?
>>
>> Thanks,
>> Muthu
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Fast-write-datastore-tp28497.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>


Re: Pretty print a dataframe...

2017-02-16 Thread Muthu Jayakumar
This worked. Thanks for the tip Michael.

Thanks,
Muthu

On Thu, Feb 16, 2017 at 12:41 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> The toString method of Dataset.queryExecution includes the various plans.
> I usually just log that directly.
>
> On Thu, Feb 16, 2017 at 8:26 AM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Hello there,
>>
>> I am trying to write to log-line a dataframe/dataset queryExecution
>> and/or its logical plan. The current code...
>>
>> def explain(extended: Boolean): Unit = {
>>   val explain = ExplainCommand(queryExecution.logical, extended = extended)
>>   
>> sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
>>  {
>> // scalastyle:off println
>> r => println(r.getString(0))
>> // scalastyle:on println
>>   }
>> }
>>
>> sessionState is not accessible if I were to write my own explain(log:
>> LoggingAdapter).
>>
>> Please advice,
>> Muthu
>>
>
>


Pretty print a dataframe...

2017-02-16 Thread Muthu Jayakumar
Hello there,

I am trying to write to log-line a dataframe/dataset queryExecution and/or
its logical plan. The current code...

def explain(extended: Boolean): Unit = {
  val explain = ExplainCommand(queryExecution.logical, extended = extended)
  
sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
{
// scalastyle:off println
r => println(r.getString(0))
// scalastyle:on println
  }
}

sessionState is not accessible if I were to write my own explain(log:
LoggingAdapter).

Please advice,
Muthu


Re: Dataframe caching

2017-01-20 Thread Muthu Jayakumar
I guess, this may help in your case?

https://spark.apache.org/docs/latest/sql-programming-guide.html#global-temporary-view

Thanks,
Muthu

On Fri, Jan 20, 2017 at 6:27 AM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Dear all,
>
> Here is a requirement I am thinking of implementing in Spark core. Please
> let me know if this is possible, and kindly provide your thoughts.
>
> A user executes a query to fetch 1 million records from , let's say a
> database. We let the user store this as a  dataframe, partitioned across
> the cluster.
>
> Another user , executed the same query from another session. Is there
> anyway that we can let the second user reuse the dataframe created by the
> first user?
>
> Can we have a master dataframe (or RDD) which stores the information about
> the current dataframes loaded and matches against any queries that are
> coming from other users?
>
> In this way, we will have a wonderful system which never allows same query
> to be executed and loaded again into the cluster memory.
>
> Best, Ravion
>


Re: Dependency Injection and Microservice development with Spark

2016-12-30 Thread Muthu Jayakumar
Adding to Lars Albertsson & Miguel Morales, I am hoping to see how
well scalameta would branch down into support for macros that can rid away
sizable DI problems and for the reminder having a class type as args as Miguel
Morales mentioned.

Thanks,


On Wed, Dec 28, 2016 at 6:41 PM, Miguel Morales 
wrote:

> Hi
>
> Not sure about Spring boot but trying to use DI libraries you'll run into
> serialization issues.I've had luck using an old version of Scaldi.
> Recently though I've been passing the class types as arguments with default
> values.  Then in the spark code it gets instantiated.  So you're basically
> passing and serializing a class name.
>
> Sent from my iPhone
>
> > On Dec 28, 2016, at 1:55 PM, Lars Albertsson  wrote:
> >
> > Do you really need dependency injection?
> >
> > DI is often used for testing purposes. Data processing jobs are easy
> > to test without DI, however, due to their functional and synchronous
> > nature. Hence, DI is often unnecessary for testing data processing
> > jobs, whether they are batch or streaming jobs.
> >
> > Or do you want to use DI for other reasons?
> >
> >
> > Lars Albertsson
> > Data engineering consultant
> > www.mapflat.com
> > https://twitter.com/lalleal
> > +46 70 7687109
> > Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
> >
> >
> > On Fri, Dec 23, 2016 at 11:56 AM, Chetan Khatri
> >  wrote:
> >> Hello Community,
> >>
> >> Current approach I am using for Spark Job Development with Scala + SBT
> and
> >> Uber Jar with yml properties file to pass configuration parameters. But
> If i
> >> would like to use Dependency Injection and MicroService Development like
> >> Spring Boot feature in Scala then what would be the standard approach.
> >>
> >> Thanks
> >>
> >> Chetan
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: DataFrame select non-existing column

2016-11-18 Thread Muthu Jayakumar
Depending on your use case, 'df.withColumn("my_existing_or_new_col",
lit(0l))' could work?

On Fri, Nov 18, 2016 at 11:18 AM, Kristoffer Sjögren 
wrote:

> Thanks for your answer. I have been searching the API for doing that
> but I could not find how to do it?
>
> Could you give me a code snippet?
>
> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf
>  wrote:
> > You can always add the columns to old dataframes giving them null (or
> some literal) as a preprocessing.
> >
> > -Original Message-
> > From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> > Sent: Friday, November 18, 2016 4:32 PM
> > To: user
> > Subject: DataFrame select non-existing column
> >
> > Hi
> >
> > We have evolved a DataFrame by adding a few columns but cannot write
> select statements on these columns for older data that doesn't have them
> since they fail with a AnalysisException with message "No such struct
> field".
> >
> > We also tried dropping columns but this doesn't work for nested columns.
> >
> > Any non-hacky ways to get around this?
> >
> > Cheers,
> > -Kristoffer
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Dataframe schema...

2016-10-21 Thread Muthu Jayakumar
Thanks Cheng Lian for opening the JIRA. I found this with Spark 2.0.0.

Thanks,
Muthu

On Fri, Oct 21, 2016 at 3:30 PM, Cheng Lian <l...@databricks.com> wrote:

> Yea, confirmed. While analyzing unions, we treat StructTypes with
> different field nullabilities as incompatible types and throws this error.
>
> Opened https://issues.apache.org/jira/browse/SPARK-18058 to track this
> issue. Thanks for reporting!
>
> Cheng
>
> On 10/21/16 3:15 PM, Cheng Lian wrote:
>
> Hi Muthu,
>
> What is the version of Spark are you using? This seems to be a bug in the
> analysis phase.
>
> Cheng
>
> On 10/21/16 12:50 PM, Muthu Jayakumar wrote:
>
> Sorry for the late response. Here is what I am seeing...
>
>
> Schema from parquet file.
>
> d1.printSchema()
>
> root
>  |-- task_id: string (nullable = true)
>  |-- task_name: string (nullable = true)
>  |-- some_histogram: struct (nullable = true)
>  ||-- values: array (nullable = true)
>  |||-- element: double (containsNull = true)
>  ||-- freq: array (nullable = true)
>  |||-- element: long (containsNull = true)
>
> d2.printSchema() //Data created using dataframe and/or processed before 
> writing to parquet file.
>
> root
>  |-- task_id: string (nullable = true)
>  |-- task_name: string (nullable = true)
>  |-- some_histogram: struct (nullable = true)
>  ||-- values: array (nullable = true)
>  |||-- element: double (containsNull = false)
>  ||-- freq: array (nullable = true)
>  |||-- element: long (containsNull = false)
>
> d1.union(d2).printSchema()
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> unresolved operator 'Union;
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> failAnalysis(CheckAnalysis.scala:40)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> failAnalysis(Analyzer.scala:58)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:126)
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> checkAnalysis(CheckAnalysis.scala:67)
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> checkAnalysis(Analyzer.scala:58)
> at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:49)
> at org.apache.spark.sql.Dataset.(Dataset.scala:161)
> at org.apache.spark.sql.Dataset.(Dataset.scala:167)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
> at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)
>
> Please advice,
> Muthu
>
> On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> What is the issue you see when unioning?
>>
>> On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar <bablo...@gmail.com>
>> wrote:
>>
>>> Hello Michael,
>>>
>>> Thank you for looking into this query. In my case there seem to be an
>>> issue when I union a parquet file read from disk versus another dataframe
>>> that I construct in-memory. The only difference I see is the containsNull =
>>> true. In fact, I do not see any errors with union on the simple schema of
>>> "col1 thru col4" above. But the problem seem to exist only on that
>>> "some_histogram" column which contains the mixed containsNull = true/false.
>>> Let me know if this helps.
>>>
>>> Thanks,
>>> Muthu
>>>
>>>
>>>
>>> On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Nullable is just a hint to the optimizer that its impossible for there
>>>> to be a null value in this column, so that it can avoid generating code for
>>>> null-checks.  When in doubt, we set nullable=true since it is always safer
>>>> to check.
>>>>
>>>> Why in particular are you trying to change the nullability of the
>>>> column?
>>>>
>>>> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar <bablo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello there,
>>>>>
>>>>> I am trying to understand how and when does DataFrame (or Dataset)
>>>>> sets nullable = true vs false on a schema.
>>>>>
>>>>> Here is my observation from a sample code I

Re: Dataframe schema...

2016-10-21 Thread Muthu Jayakumar
Sorry for the late response. Here is what I am seeing...


Schema from parquet file.

d1.printSchema()

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

d2.printSchema() //Data created using dataframe and/or processed
before writing to parquet file.

root
 |-- task_id: string (nullable = true)
 |-- task_name: string (nullable = true)
 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = false)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = false)

d1.union(d2).printSchema()

Exception in thread "main" org.apache.spark.sql.AnalysisException:
unresolved operator 'Union;
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:361)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset.(Dataset.scala:161)
at org.apache.spark.sql.Dataset.(Dataset.scala:167)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:59)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2594)
at org.apache.spark.sql.Dataset.union(Dataset.scala:1459)

Please advice,
Muthu

On Thu, Oct 20, 2016 at 1:46 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> What is the issue you see when unioning?
>
> On Wed, Oct 19, 2016 at 6:39 PM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Hello Michael,
>>
>> Thank you for looking into this query. In my case there seem to be an
>> issue when I union a parquet file read from disk versus another dataframe
>> that I construct in-memory. The only difference I see is the containsNull =
>> true. In fact, I do not see any errors with union on the simple schema of
>> "col1 thru col4" above. But the problem seem to exist only on that
>> "some_histogram" column which contains the mixed containsNull = true/false.
>> Let me know if this helps.
>>
>> Thanks,
>> Muthu
>>
>>
>>
>> On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> Nullable is just a hint to the optimizer that its impossible for there
>>> to be a null value in this column, so that it can avoid generating code for
>>> null-checks.  When in doubt, we set nullable=true since it is always safer
>>> to check.
>>>
>>> Why in particular are you trying to change the nullability of the column?
>>>
>>> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar <bablo...@gmail.com>
>>> wrote:
>>>
>>>> Hello there,
>>>>
>>>> I am trying to understand how and when does DataFrame (or Dataset) sets
>>>> nullable = true vs false on a schema.
>>>>
>>>> Here is my observation from a sample code I tried...
>>>>
>>>>
>>>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3,
>>>> "c", 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>>>> lit("bla")).printSchema()
>>>> root
>>>>  |-- col1: integer (nullable = false)
>>>>  |-- col2: string (nullable = true)
>>>>  |-- col3: double (nullable = false)
>>>>  |-- col4: string (nullable = false)
>>>>
>>>>
>>>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3,
>>>> "c", 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>>>> lit("bla")).write.parquet("/tmp/sample.parquet")
>>>>
>>>> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
>>>> root
>>>>  |-- col1: integer (nullable = true)
>>>>  |-- col2: string (nullable = t

Re: Dataframe schema...

2016-10-19 Thread Muthu Jayakumar
Hello Michael,

Thank you for looking into this query. In my case there seem to be an issue
when I union a parquet file read from disk versus another dataframe that I
construct in-memory. The only difference I see is the containsNull = true.
In fact, I do not see any errors with union on the simple schema of "col1
thru col4" above. But the problem seem to exist only on that
"some_histogram" column which contains the mixed containsNull = true/false.
Let me know if this helps.

Thanks,
Muthu



On Wed, Oct 19, 2016 at 6:21 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Nullable is just a hint to the optimizer that its impossible for there to
> be a null value in this column, so that it can avoid generating code for
> null-checks.  When in doubt, we set nullable=true since it is always safer
> to check.
>
> Why in particular are you trying to change the nullability of the column?
>
> On Wed, Oct 19, 2016 at 6:07 PM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Hello there,
>>
>> I am trying to understand how and when does DataFrame (or Dataset) sets
>> nullable = true vs false on a schema.
>>
>> Here is my observation from a sample code I tried...
>>
>>
>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>> lit("bla")).printSchema()
>> root
>>  |-- col1: integer (nullable = false)
>>  |-- col2: string (nullable = true)
>>  |-- col3: double (nullable = false)
>>  |-- col4: string (nullable = false)
>>
>>
>> scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
>> 2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
>> lit("bla")).write.parquet("/tmp/sample.parquet")
>>
>> scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
>> root
>>  |-- col1: integer (nullable = true)
>>  |-- col2: string (nullable = true)
>>  |-- col3: double (nullable = true)
>>  |-- col4: string (nullable = true)
>>
>>
>> The place where this seem to get me into trouble is when I try to union
>> one data-structure from in-memory (notice that in the below schema the
>> highlighted element is represented as 'false' for in-memory created schema)
>> and one from file that starts out with a schema like below...
>>
>>  |-- some_histogram: struct (nullable = true)
>>  ||-- values: array (nullable = true)
>>  |||-- element: double (containsNull = true)
>>  ||-- freq: array (nullable = true)
>>  |||-- element: long (containsNull = true)
>>
>> Is there a way to convert this attribute from true to false without
>> running any mapping / udf on that column?
>>
>> Please advice,
>> Muthu
>>
>
>


Dataframe schema...

2016-10-19 Thread Muthu Jayakumar
Hello there,

I am trying to understand how and when does DataFrame (or Dataset) sets
nullable = true vs false on a schema.

Here is my observation from a sample code I tried...


scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
lit("bla")).printSchema()
root
 |-- col1: integer (nullable = false)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = false)
 |-- col4: string (nullable = false)


scala> spark.createDataset(Seq((1, "a", 2.0d), (2, "b", 2.0d), (3, "c",
2.0d))).toDF("col1", "col2", "col3").withColumn("col4",
lit("bla")).write.parquet("/tmp/sample.parquet")

scala> spark.read.parquet("/tmp/sample.parquet").printSchema()
root
 |-- col1: integer (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: double (nullable = true)
 |-- col4: string (nullable = true)


The place where this seem to get me into trouble is when I try to union one
data-structure from in-memory (notice that in the below schema the
highlighted element is represented as 'false' for in-memory created schema)
and one from file that starts out with a schema like below...

 |-- some_histogram: struct (nullable = true)
 ||-- values: array (nullable = true)
 |||-- element: double (containsNull = true)
 ||-- freq: array (nullable = true)
 |||-- element: long (containsNull = true)

Is there a way to convert this attribute from true to false without running
any mapping / udf on that column?

Please advice,
Muthu


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-07 Thread Muthu Jayakumar
Hello Hao Ren,

Doesn't the code...

val add = udf {
  (a: Int) => a + notSer.value
}
Mean UDF function that Int => Int ?

Thanks,
Muthu

On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren  wrote:

> I am playing with spark 2.0
> What I tried to test is:
>
> Create a UDF in which there is a non serializable object.
> What I expected is when this UDF is called during materializing the
> dataFrame where the UDF is used in "select", an task non serializable
> exception should be thrown.
> It depends also which "action" is called on that dataframe.
>
> Here is the code for reproducing the pb:
>
> 
> object DataFrameSerDeTest extends App {
>
>   class A(val value: Int) // It is not serializable
>
>   def run() = {
> val spark = SparkSession
>   .builder()
>   .appName("DataFrameSerDeTest")
>   .master("local[*]")
>   .getOrCreate()
>
> import org.apache.spark.sql.functions.udf
> import spark.sqlContext.implicits._
>
> val notSer = new A(2)
> val add = udf {
>   (a: Int) => a + notSer.value
> }
> val df = spark.createDataFrame(Seq(
>   (1, 2),
>   (2, 2),
>   (3, 2),
>   (4, 2)
> )).toDF("key", "value")
>   .select($"key", add($"value").as("added"))
>
> df.show() // *It should not work because the udf contains a
> non-serializable object, but it works*
>
> df.filter($"key" === 2).show() // *It does not work as expected
> (org.apache.spark.SparkException: Task not serializable)*
>   }
>
>   run()
> }
> 
>
> Also, I tried collect(), count(), first(), limit(). All of them worked
> without non-serializable exceptions.
> It seems only filter() throws the exception. (feature or bug ?)
>
> Any ideas ? Or I just messed things up ?
> Any help is highly appreciated.
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>


Re: Dataframe / Dataset partition size...

2016-08-06 Thread Muthu Jayakumar
Hello Dr Mich Talebzadeh,

>Can you kindly advise on your number of nodes, the cores for each node and
the RAM for each node.
I have a 32 node (1 executor per node currently) cluster. All these have
512 GB  of memory. Most of these are either 16 or 20 physical cores (with
out HT enabled). The HDFS is configured to run on another set of nodes (but
are all part of the same rack / subnet)

>Is this a parquet file?
Yes, it is a parquet directory.

>What I don't understand why you end up with 220 files whereas you
partition says 25
I do have some kind of hack ;) in place that can roughly size the file to a
block size of my HDFS so that the number of parts created can be optimized
for HDFS storage. But I wanted to understand why it allocates smaller
number of cores during a read cycle?

My current work around for this problem is to run multiple parallel queries
of this kind :( (basically scala Future - fork-join magic). But, this seem
incorrect.

I do have some parquet files that uses like 9 partitions (though the files
are 200).

Here is a sample code from Spark 2.0.0 shell that i tried...

case class Customer(number: Int)
import org.apache.spark.sql._
import spark.implicits._
val parquetFile = "hdfs://myip:port/tmp/dummy.parquet"
spark.createDataset(1 to
1).map(Customer).repartition(200).write.mode(SaveMode.Overwrite).parquet(parquetFile)

scala> spark.read.parquet(parquetFile).toJavaRDD.partitions.size()
res1: Int = 23

scala> spark.read.parquet(parquetFile).toJavaRDD.partitions.size()
res2: Int = 20

Can I suspect something with dynamic allocation perhaps?

Please advice,
Muthu


On Sat, Aug 6, 2016 at 3:23 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> 720 cores Wow. That is a hell of cores  Muthu :)
>
> Ok let us take a step back
>
> Can you kindly advise on your number of nodes, the cores for each node and
> the RAM for each node.
>
> What I don't understand why you end up with 220 files whereas you
> partition says 25. Now you have 2.2GB of size so each file only has
> 2.2GB/220  = 10MB. That is a lot of files for nothing. The app has to load
> each file
> Is this a parquet file?
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 6 August 2016 at 23:09, Muthu Jayakumar <bablo...@gmail.com> wrote:
>
>> Hello Dr Mich Talebzadeh,
>>
>> Thank you for looking into my question. W.r.t
>> >However, in reality the number of partitions should not exceed the
>> total number of cores in your cluster?
>> I do have 720 cores available in a cluster for this to run. It does run
>> in dynamic provisioning.
>>
>> On a side note, I was expecting the partition count to match up to what
>> you have. But :( , my numbers above now asks me to understand the APIs
>> better :).
>>
>> Please advice,
>> Muthu
>>
>>
>>
>> On Sat, Aug 6, 2016 at 1:54 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi Muthu.
>>>
>>> Interesting question.
>>>
>>> I have the following:
>>>
>>> scala> val s = HiveContext.table("dummy_parqu
>>> et").toJavaRDD.partitions.size()
>>> s: Int = 256
>>>
>>> and on HDFS it has
>>>
>>> hdfs dfs -ls  /user/hive/warehouse/oraclehadoop.db/dummy_parquet|wc -l
>>> 16/08/06 21:50:45 WARN util.NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>>  257
>>>
>>> Which is somehow consistent. its size
>>>
>>> hdfs dfs -du -h -s  /user/hive/warehouse/oraclehadoop.db/dummy_parquet
>>> 16/08/06 21:51:50 WARN util.NativeCodeLoader: Unable to load
>>> native-hadoop library for your platform... using builtin-java classes where
>>> applicable
>>> 5.9 G  /user/hive/warehouse/oraclehadoop.db/dummy_parquet
>>>
>>> nearly 6GB
>>>
>>> sc.defaultParallelism
>>> res6: Int = 1
>>>
>>>
>>> However, in reality the number of partitions should not exceed the total
>>>

Dataframe / Dataset partition size...

2016-08-06 Thread Muthu Jayakumar
Hello there,

I am trying to understand how I could improve (or increase) the parallelism
of tasks that run for a particular spark job.
Here is my observation...

scala> spark.read.parquet("hdfs://somefile").toJavaRDD.partitions.size()
25

> hadoop fs -ls hdfs://somefile | grep 'part-r' | wc -l
200

> hadoop fs -du -h -s hdfs://somefile
2.2 G

I notice that, depending on what the repartition / coalesce the number of
part files to HDFS is created appropriately during the save operation.
Meaning the number of part files can be tweaked according to this parameter.

But, how do I control the 'partitions.size()'? Meaning, I want to have this
to be 200 (without having to repartition it during the read operation so
that I would be able have more number of tasks run for this job)
This has a major impact in-terms of the time it takes to perform query
operations on this job.

On a side note, I do understand that 200 parquet part files for the above
2.2 G seems over-kill for a 128 MB block size. Ideally it should be 18
parts or so.

Please advice,
Muthu


Re: Question / issue while creating a parquet file using a text file with spark 2.0...

2016-07-28 Thread Muthu Jayakumar
Hello Dong Meng,

Thanks for the tip. But, I do have code in place that looks like this...

StructField(columnName, getSparkDataType(dataType), nullable = true)

May be I am missing something else. The same code works fine with Spark
1.6.2 though. On a side note, I could be using SparkSession, but i don't
know how to split and map the row elegantly. Hence using it as RDD.

Thanks,
Muthu


On Thu, Jul 28, 2016 at 10:47 PM, Dong Meng <mengdong0...@gmail.com> wrote:

> you can specify nullable in StructField
>
> On Thu, Jul 28, 2016 at 9:14 PM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Hello there,
>>
>> I am using Spark 2.0.0 to create a parquet file using a text file with
>> Scala. I am trying to read a text file with bunch of values of type string
>> and long (mostly). And all the occurrences can be null. In order to support
>> nulls, all the values are boxed with Option (ex:- Option[String],
>> Option[Long]).
>> The schema for the parquet file is based on some external metadata file,
>> so I use 'StructField' to create a schema programmatically and perform some
>> code snippet like below...
>>
>> sc.textFile(flatFile).map(_.split(fileSeperator)).map { line =>
>>   convertToRawColumns(line, schemaSeq)
>> }
>>
>> ...
>>
>> val df = sqlContext.createDataFrame(rawFileRDD, schema) //Failed line.
>>
>> On a side note, the same code used to work fine with Spark 1.6.2.
>>
>> Here is the error from Spark 2.0.0.
>>
>> Jul 28, 2016 8:27:10 PM INFO:
>> org.apache.parquet.hadoop.codec.CodecConfig: Compression: SNAPPY
>> Jul 28, 2016 8:27:10 PM INFO:
>> org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size Jul-28
>> 20:27:10 [Executor task launch worker-483] ERROR   Utils Aborting task
>> java.lang.RuntimeException: Error while encoding:
>> java.lang.RuntimeException: scala.Some is not a valid external type for
>> schema of string
>> if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
>> object).isNullAt) null else staticinvoke(class
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true], top level row object), 0, host),
>> StringType), true) AS host#37315
>> +- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
>> row object).isNullAt) null else staticinvoke(class
>> org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true], top level row object), 0, host),
>> StringType), true)
>>:- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
>> row object).isNullAt
>>:  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top
>> level row object)
>>:  :  +- input[0, org.apache.spark.sql.Row, true]
>>:  +- 0
>>:- null
>>+- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
>> StringType, fromString,
>> validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true], top level row object), 0, host),
>> StringType), true)
>>   +- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true], top level row object), 0, host),
>> StringType)
>>  +- getexternalrowfield(assertnotnull(input[0,
>> org.apache.spark.sql.Row, true], top level row object), 0, host)
>> +- assertnotnull(input[0, org.apache.spark.sql.Row, true],
>> top level row object)
>>+- input[0, org.apache.spark.sql.Row, true]
>>
>>
>> Let me know if you would like me try to create a more simplified
>> reproducer to this problem. Perhaps I should not be using Option[T] for
>> nullable schema values?
>>
>> Please advice,
>> Muthu
>>
>
>


Question / issue while creating a parquet file using a text file with spark 2.0...

2016-07-28 Thread Muthu Jayakumar
Hello there,

I am using Spark 2.0.0 to create a parquet file using a text file with
Scala. I am trying to read a text file with bunch of values of type string
and long (mostly). And all the occurrences can be null. In order to support
nulls, all the values are boxed with Option (ex:- Option[String],
Option[Long]).
The schema for the parquet file is based on some external metadata file, so
I use 'StructField' to create a schema programmatically and perform some
code snippet like below...

sc.textFile(flatFile).map(_.split(fileSeperator)).map { line =>
  convertToRawColumns(line, schemaSeq)
}

...

val df = sqlContext.createDataFrame(rawFileRDD, schema) //Failed line.

On a side note, the same code used to work fine with Spark 1.6.2.

Here is the error from Spark 2.0.0.

Jul 28, 2016 8:27:10 PM INFO: org.apache.parquet.hadoop.codec.CodecConfig:
Compression: SNAPPY
Jul 28, 2016 8:27:10 PM INFO:
org.apache.parquet.hadoop.ParquetOutputFormat: Parquet block size Jul-28
20:27:10 [Executor task launch worker-483] ERROR   Utils Aborting task
java.lang.RuntimeException: Error while encoding:
java.lang.RuntimeException: scala.Some is not a valid external type for
schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, host),
StringType), true) AS host#37315
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, host),
StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, host),
StringType), true)
  +- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, host),
StringType)
 +- getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, host)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top
level row object)
   +- input[0, org.apache.spark.sql.Row, true]


Let me know if you would like me try to create a more simplified reproducer
to this problem. Perhaps I should not be using Option[T] for nullable
schema values?

Please advice,
Muthu


Re: 10hrs of Scheduler Delay

2016-01-22 Thread Muthu Jayakumar
Does increasing the number of partition helps? You could try out something
3 times what you currently have.
Another trick i used was to partition the problem into multiple dataframes
and run them sequentially and persistent the result and then run a union on
the results.

Hope this helps.

On Fri, Jan 22, 2016, 3:48 AM Darren Govoni  wrote:

> Me too. I had to shrink my dataset to get it to work. For us at least
> Spark seems to have scaling issues.
>
>
>
> Sent from my Verizon Wireless 4G LTE smartphone
>
>
>  Original message 
> From: "Sanders, Isaac B" 
> Date: 01/21/2016 11:18 PM (GMT-05:00)
> To: Ted Yu 
> Cc: user@spark.apache.org
> Subject: Re: 10hrs of Scheduler Delay
>
> I have run the driver on a smaller dataset (k=2, n=5000) and it worked
> quickly and didn’t hang like this. This dataset is closer to k=10, n=4.4m,
> but I am using more resources on this one.
>
> - Isaac
>
> On Jan 21, 2016, at 11:06 PM, Ted Yu  wrote:
>
> You may have seen the following on github page:
>
> Latest commit 50fdf0e  on Feb 22, 2015
>
> That was 11 months ago.
>
> Can you search for similar algorithm which runs on Spark and is newer ?
>
> If nothing found, consider running the tests coming from the project to
> determine whether the delay is intrinsic.
>
> Cheers
>
> On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B <
> sande...@rose-hulman.edu> wrote:
>
>> That thread seems to be moving, it oscillates between a few different
>> traces… Maybe it is working. It seems odd that it would take that long.
>>
>> This is 3rd party code, and after looking at some of it, I think it might
>> not be as Spark-y as it could be.
>>
>> I linked it below. I don’t know a lot about spark, so it might be fine,
>> but I have my suspicions.
>>
>>
>> https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala
>>
>> - Isaac
>>
>> On Jan 21, 2016, at 10:08 PM, Ted Yu  wrote:
>>
>> You may have noticed the following - did this indicate prolonged
>> computation in your code ?
>>
>> org.apache.commons.math3.util.MathArrays.distance(MathArrays.java:205)
>> org.apache.commons.math3.ml.distance.EuclideanDistance.compute(EuclideanDistance.java:34)
>> org.alitouka.spark.dbscan.spatial.DistanceCalculation$class.calculateDistance(DistanceCalculation.scala:15)
>> org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver$.calculateDistance(DistanceToNearestNeighborDriver.scala:16)
>>
>>
>> On Thu, Jan 21, 2016 at 5:13 PM, Sanders, Isaac B <
>> sande...@rose-hulman.edu> wrote:
>>
>>> Hadoop is: HDP 2.3.2.0-2950
>>>
>>> Here is a gist (pastebin) of my versions en masse and a stacktrace:
>>> https://gist.github.com/isaacsanders/2e59131758469097651b
>>>
>>> Thanks
>>>
>>> On Jan 21, 2016, at 7:44 PM, Ted Yu  wrote:
>>>
>>> Looks like you were running on YARN.
>>>
>>> What hadoop version are you using ?
>>>
>>> Can you capture a few stack traces of the AppMaster during the delay and
>>> pastebin them ?
>>>
>>> Thanks
>>>
>>> On Thu, Jan 21, 2016 at 8:08 AM, Sanders, Isaac B <
>>> sande...@rose-hulman.edu> wrote:
>>>
 The Spark Version is 1.4.1

 The logs are full of standard fair, nothing like an exception or even
 interesting [INFO] lines.

 Here is the script I am using:
 https://gist.github.com/isaacsanders/660f480810fbc07d4df2

 Thanks
 Isaac

 On Jan 21, 2016, at 11:03 AM, Ted Yu  wrote:

 Can you provide a bit more information ?

 command line for submitting Spark job
 version of Spark
 anything interesting from driver / executor logs ?

 Thanks

 On Thu, Jan 21, 2016 at 7:35 AM, Sanders, Isaac B <
 sande...@rose-hulman.edu> wrote:

> Hey all,
>
> I am a CS student in the United States working on my senior thesis.
>
> My thesis uses Spark, and I am encountering some trouble.
>
> I am using https://github.com/alitouka/spark_dbscan, and to determine
> parameters, I am using the utility class they supply,
> org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver.
>
> I am on a 10 node cluster with one machine with 8 cores and 32G of
> memory and nine machines with 6 cores and 16G of memory.
>
> I have 442M of data, which seems like it would be a joke, but the job
> stalls at the last stage.
>
> It was stuck in Scheduler Delay for 10 hours overnight, and I have
> tried a number of things for the last couple days, but nothing seems to be
> helping.
>
> I have tried:
> - Increasing heap sizes and numbers of cores
> - More/less executors with different amounts of resources.
> - Kyro Serialization
> - FAIR Scheduling
>
> It doesn’t seem like it should require 

Re: 10hrs of Scheduler Delay

2016-01-22 Thread Muthu Jayakumar
If you turn on config (like "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps")
you would be able to see why some job run for a long time.
The tuning guide (http://spark.apache.org/docs/latest/tuning.html) provides
some insight on this. Setting up explicit partition helped in my case when
I was using RDD.

Hope this helps.

On Fri, Jan 22, 2016 at 1:51 PM, Darren Govoni <dar...@ontrenet.com> wrote:

> Thanks for the tip. I will try it. But this is the kind of thing spark is
> supposed to figure out and handle. Or at least not get stuck forever.
>
>
>
> Sent from my Verizon Wireless 4G LTE smartphone
>
>
> ---- Original message 
> From: Muthu Jayakumar <bablo...@gmail.com>
> Date: 01/22/2016 3:50 PM (GMT-05:00)
> To: Darren Govoni <dar...@ontrenet.com>, "Sanders, Isaac B" <
> sande...@rose-hulman.edu>, Ted Yu <yuzhih...@gmail.com>
> Cc: user@spark.apache.org
> Subject: Re: 10hrs of Scheduler Delay
>
> Does increasing the number of partition helps? You could try out something
> 3 times what you currently have.
> Another trick i used was to partition the problem into multiple dataframes
> and run them sequentially and persistent the result and then run a union on
> the results.
>
> Hope this helps.
>
> On Fri, Jan 22, 2016, 3:48 AM Darren Govoni <dar...@ontrenet.com> wrote:
>
>> Me too. I had to shrink my dataset to get it to work. For us at least
>> Spark seems to have scaling issues.
>>
>>
>>
>> Sent from my Verizon Wireless 4G LTE smartphone
>>
>>
>>  Original message 
>> From: "Sanders, Isaac B" <sande...@rose-hulman.edu>
>> Date: 01/21/2016 11:18 PM (GMT-05:00)
>> To: Ted Yu <yuzhih...@gmail.com>
>> Cc: user@spark.apache.org
>> Subject: Re: 10hrs of Scheduler Delay
>>
>> I have run the driver on a smaller dataset (k=2, n=5000) and it worked
>> quickly and didn’t hang like this. This dataset is closer to k=10, n=4.4m,
>> but I am using more resources on this one.
>>
>> - Isaac
>>
>> On Jan 21, 2016, at 11:06 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> You may have seen the following on github page:
>>
>> Latest commit 50fdf0e  on Feb 22, 2015
>>
>> That was 11 months ago.
>>
>> Can you search for similar algorithm which runs on Spark and is newer ?
>>
>> If nothing found, consider running the tests coming from the project to
>> determine whether the delay is intrinsic.
>>
>> Cheers
>>
>> On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B <
>> sande...@rose-hulman.edu> wrote:
>>
>>> That thread seems to be moving, it oscillates between a few different
>>> traces… Maybe it is working. It seems odd that it would take that long.
>>>
>>> This is 3rd party code, and after looking at some of it, I think it
>>> might not be as Spark-y as it could be.
>>>
>>> I linked it below. I don’t know a lot about spark, so it might be fine,
>>> but I have my suspicions.
>>>
>>>
>>> https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala
>>>
>>> - Isaac
>>>
>>> On Jan 21, 2016, at 10:08 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>> You may have noticed the following - did this indicate prolonged
>>> computation in your code ?
>>>
>>> org.apache.commons.math3.util.MathArrays.distance(MathArrays.java:205)
>>> org.apache.commons.math3.ml.distance.EuclideanDistance.compute(EuclideanDistance.java:34)
>>> org.alitouka.spark.dbscan.spatial.DistanceCalculation$class.calculateDistance(DistanceCalculation.scala:15)
>>> org.alitouka.spark.dbscan.exploratoryAnalysis.DistanceToNearestNeighborDriver$.calculateDistance(DistanceToNearestNeighborDriver.scala:16)
>>>
>>>
>>> On Thu, Jan 21, 2016 at 5:13 PM, Sanders, Isaac B <
>>> sande...@rose-hulman.edu> wrote:
>>>
>>>> Hadoop is: HDP 2.3.2.0-2950
>>>>
>>>> Here is a gist (pastebin) of my versions en masse and a stacktrace:
>>>> https://gist.github.com/isaacsanders/2e59131758469097651b
>>>>
>>>> Thanks
>>>>
>>>> On Jan 21, 2016, at 7:44 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>> Looks like you were running on YARN.
>>>>
>>>> What hadoop version are you using ?
>>>>
>>>> Can you capture a few stack traces of the AppMaster during the d

Re: cast column string -> timestamp in Parquet file

2016-01-21 Thread Muthu Jayakumar
DataFrame and udf. This may be more performant than doing an RDD
transformation as you'll only transform just the column that requires to be
changed.

Hope this helps.


On Thu, Jan 21, 2016 at 6:17 AM, Eli Super  wrote:

> Hi
>
> I have a large size parquet file .
>
> I need to cast the whole column to timestamp format , then save
>
> What the right way to do it ?
>
> Thanks a lot
>
>


Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-12 Thread Muthu Jayakumar
Thanks Micheal. Let me test it with a recent master code branch.

Also for every mapping step should I have to create a new case class? I
cannot use Tuple as I have ~130 columns to process. Earlier I had used a
Seq[Any] (actually Array[Any] to optimize on serialization) but processed
it using RDD (by building the Schema at runtime). Now I am attempting to
replace this using Dataset.

>the problem is that at compile time we don't know if its an inner or outer
join.
May I suggest to have different methods for different kind of joins
(similar to RDD api)? This way the typesafety is enforced.

Here is the error message.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task not serializable: java.io.NotSerializableException:
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
Serialization stack: - object not serializable (class:
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,
value: package lang) - field (class: scala.reflect.internal.Types$ThisType,
name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object
(class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field
(class: scala.reflect.internal.Types$TypeRef, name: pre, type: class
scala.reflect.internal.Types$Type) - object (class
scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class:
scala.reflect.internal.Types$TypeRef, name: normalized, type: class
scala.reflect.internal.Types$Type) - object (class
scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class:
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1,
type: class scala.reflect.api.Types$TypeApi) - object (class
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class:
org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type:
interface scala.Function1) - object (class
org.apache.spark.sql.catalyst.expressions.MapObjects,
mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field
(class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) -
field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
targetObject, type: class
org.apache.spark.sql.catalyst.expressions.Expression) - object (class
org.apache.spark.sql.catalyst.expressions.Invoke,
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;))) - writeObject data (class:
scala.collection.immutable.List$SerializationProxy) - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@7e78c3cf) - writeReplace
data (class: scala.collection.immutable.List$SerializationProxy) - object
(class scala.collection.immutable.$colon$colon,
List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object; - field (class:
org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments,
type: interface scala.collection.Seq) - object (class
org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),true)) - writeObject data (class:
scala.collection.immutable.List$SerializationProxy) - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@377795c5) - writeReplace
data (class: scala.collection.immutable.List$SerializationProxy) - object
(class scala.collection.immutable.$colon$colon, List(staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: 

Re: Lost tasks due to OutOfMemoryError (GC overhead limit exceeded)

2016-01-12 Thread Muthu Jayakumar
>export SPARK_WORKER_MEMORY=4g
May be you could increase the max heapsize on the worker? In case if the
OutOfMemory is for the driver, then you may want to set it up explicitly
for the driver.

Thanks,



On Tue, Jan 12, 2016 at 2:04 AM, Barak Yaish  wrote:

> Hello,
>
> I've a 5 nodes cluster which hosts both hdfs datanodes and spark workers.
> Each node has 8 cpu and 16G memory. Spark version is 1.5.2, spark-env.sh is
> as follow:
>
> export SPARK_MASTER_IP=10.52.39.92
>
> export SPARK_WORKER_INSTANCES=4
>
> export SPARK_WORKER_CORES=8
> export SPARK_WORKER_MEMORY=4g
>
> And more settings done in the application code:
>
>
> sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
>
> sparkConf.set("spark.kryo.registrator",InternalKryoRegistrator.class.getName());
> sparkConf.set("spark.kryo.registrationRequired","true");
> sparkConf.set("spark.kryoserializer.buffer.max.mb","512");
> sparkConf.set("spark.default.parallelism","300");
> sparkConf.set("spark.rpc.askTimeout","500");
>
> I'm trying to load data from hdfs and running some sqls on it (mostly
> groupby) using DataFrames. The logs keep saying that tasks are lost due to
> OutOfMemoryError (GC overhead limit exceeded).
>
> Can you advice what is the recommended settings (memory, cores,
> partitions, etc.) for the given hardware?
>
> Thanks!
>


Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-12 Thread Muthu Jayakumar
/* 147 */ }
/* 148 */
/* 149 */ return mutableRow;
/* 150 */   }
/* 151 */ }
/* 152 */

Thanks.



On Tue, Jan 12, 2016 at 11:35 AM, Muthu Jayakumar <bablo...@gmail.com>
wrote:

> Thanks Micheal. Let me test it with a recent master code branch.
>
> Also for every mapping step should I have to create a new case class? I
> cannot use Tuple as I have ~130 columns to process. Earlier I had used a
> Seq[Any] (actually Array[Any] to optimize on serialization) but processed
> it using RDD (by building the Schema at runtime). Now I am attempting to
> replace this using Dataset.
>
> >the problem is that at compile time we don't know if its an inner or
> outer join.
> May I suggest to have different methods for different kind of joins
> (similar to RDD api)? This way the typesafety is enforced.
>
> Here is the error message.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task not serializable:
> java.io.NotSerializableException:
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
> Serialization stack: - object not serializable (class:
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,
> value: package lang) - field (class: scala.reflect.internal.Types$ThisType,
> name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object
> (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field
> (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class
> scala.reflect.internal.Types$Type) - object (class
> scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class:
> scala.reflect.internal.Types$TypeRef, name: normalized, type: class
> scala.reflect.internal.Types$Type) - object (class
> scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class:
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1,
> type: class scala.reflect.api.Types$TypeApi) - object (class
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class:
> org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type:
> interface scala.Function1) - object (class
> org.apache.spark.sql.catalyst.expressions.MapObjects,
> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field
> (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) -
> field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
> targetObject, type: class
> org.apache.spark.sql.catalyst.expressions.Expression) - object (class
> org.apache.spark.sql.catalyst.expressions.Invoke,
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;))) - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@7e78c3cf) -
> writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.$colon$colon,
> List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object; - field (class:
> org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments,
> type: interface scala.collection.Seq) - object (class
> org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- roo

Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-11 Thread Muthu Jayakumar
Hello Michael,

Thank you for the suggestion. This should do the trick for column names.
But how could I transform columns value type? Do I have to use an UDF? In
case if I use UDF, then the other question I may have is pertaining to the
map step in dataset, where I am running into an error when I try to
transform the object into another type.

For example:

case class MyMap(map: Map[String, String])

case class TestCaseClass(a: String, b: String){
  def toMyMap: MyMap = {
MyMap(Map(a->b))
  }

  def toStr: String = {
a
  }
}

//Main method section below

import sqlContext.implicits._

val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01",
"data1"), TestCaseClass("2015-05-01", "data2"))).toDF()

df1.as[TestCaseClass].map(_.toStr).show() //works fine
df1.as[TestCaseClass].map(_.toMyMap).show() //fails

Please advice on what I may be missing here?


Also for join, may I suggest to have a custom encoder / transformation to
say how 2 datasets can merge?
Also, when a join in made using something like 'left outer join' the right
side object should ideally be Option kind (similar to what's seen in RDD).
And I think this may make it strongly typed?

Thank you for looking into my email.

Thanks,
Muthu


On Mon, Jan 11, 2016 at 3:08 PM, Michael Armbrust 
wrote:

> Also, while extracting a value into Dataset using as[U] method, how could
>> I specify a custom encoder/translation to case class (where I don't have
>> the same column-name mapping or same data-type mapping)?
>>
>
> There is no public API yet for defining your own encoders.  You change the
> column names using select and as to make sure they line up correctly.
>
> df.select($"oldName".as("newName"))
>


Spark 1.6 udf/udaf alternatives in dataset?

2016-01-10 Thread Muthu Jayakumar
Hello there,

While looking at the features of Dataset, it seem to provide an alternative
way towards udf and udaf. Any documentation or sample code snippet to write
this would be helpful in rewriting existing UDFs into Dataset mapping step.
Also, while extracting a value into Dataset using as[U] method, how could I
specify a custom encoder/translation to case class (where I don't have the
same column-name mapping or same data-type mapping)?

Please advice,
Muthu


Re: Out of memory issue

2016-01-06 Thread Muthu Jayakumar
Thanks Ewan Leith. This seems like a good start, as it seem to match up to
the symptoms I am seeing :).

But, how do I specify "parquet.memory.pool.ratio"?
Parquet code seem to take this parameter from
ParquetOutputFormat.getRecordWriter()
(ref code: float maxLoadconf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
MemoryManager.DEFAULT_MEMORY_POOL_RATIO);).
I wonder how is this provided thru Apache Spark. Meaning, I see that
'TaskAttemptContext' seems to be the hint to provide this. But I am not
able to find a way I could provide this configuration.

Please advice,
Muthu

On Wed, Jan 6, 2016 at 1:57 AM, Ewan Leith 
wrote:

> Hi Muthu, this could be related to a known issue in the release notes
>
> http://spark.apache.org/releases/spark-release-1-6-0.html
>
> Known issues
>
> SPARK-12546 -  Save DataFrame/table as Parquet with dynamic partitions
> may cause OOM; this can be worked around by decreasing the memory used by
> both Spark and Parquet using spark.memory.fraction (for example, 0.4) and
> parquet.memory.pool.ratio (for example, 0.3, in Hadoop configuration, e.g.
> setting it in core-site.xml).
>
> It's definitely worth setting spark.memory.fraction and
> parquet.memory.pool.ratio and trying again.
>
> Ewan
>
> -Original Message-
> From: babloo80 [mailto:bablo...@gmail.com]
> Sent: 06 January 2016 03:44
> To: user@spark.apache.org
> Subject: Out of memory issue
>
> Hello there,
>
> I have a spark job reads 7 parquet files (8 GB, 3 x 16 GB, 3 x 14 GB) in
> different stages of execution and creates a result parquet of 9 GB (about
> 27 million rows containing 165 columns. some columns are map based
> containing utmost 200 value histograms). The stages involve, Step 1:
> Reading the data using dataframe api Step 2: Transform dataframe to RDD (as
> the some of the columns are transformed into histograms (using empirical
> distribution to cap the number of keys) and some of them run like UDAF
> during reduce-by-key step) to perform and perform some transformations Step
> 3: Reduce the result by key so that the resultant can be used in the next
> stage for join Step 4: Perform left outer join of this result which runs
> similar Steps 1 thru 3.
> Step 5: The results are further reduced to be written to parquet
>
> With Apache Spark 1.5.2, I am able to run the job with no issues.
> Current env uses 8 nodes running a total of  320 cores, 100 GB executor
> memory per node with driver program using 32 GB. The approximate execution
> time is about 1.2 hrs. The parquet files are stored in another HDFS cluster
> for read and eventual write of the result.
>
> When the same job is executed using Apache 1.6.0, some of the executor
> node's JVM gets restarted (with a new executor id). On further turning-on
> GC stats on the executor, the perm-gen seem to get maxed out and ends up
> showing the symptom of out-of-memory.
>
> Please advice on where to start investigating this issue.
>
> Thanks,
> Muthu
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-issue-tp25888.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark and Spring Integrations

2015-11-15 Thread Muthu Jayakumar
I have only written Akka code in Scala only. Here is the akka documentation
that would help you to get started...
http://doc.akka.io/docs/akka/2.4.0/intro/getting-started.html

>JavaSparkContext(conf)
The idea is to create a SparkContext and pass it as a props (constructor in
java sense) to an akka actor so that you can send interactive spark jobs to
the actor system. The only caveat is that, you'll have to run this spark
application in client mode.

>sc.parallelize(list).foreach
>// here we will have db transaction as well.
The way I had done DB Transaction is to run a synchronous (awaitable call
from Akka sense) to perform db operation atomic to the data being processed
using slick (http://slick.typesafe.com/doc/3.1.0/gettingstarted.html).
In your case the following two links could shed some light...
-
http://stackoverflow.com/questions/24896233/how-to-save-apache-spark-schema-output-in-mysql-database
-
https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html

On a side note, I noticed that you provide a custom serializer. In my case,
I have used case classes (a construct from Scala) that can use the default
serializer provided by Spark.

Hope this helps.

Thanks,
Muthu


On Sat, Nov 14, 2015 at 10:18 PM, Netai Biswas <mail2efo...@gmail.com>
wrote:

> Hi,
>
> Thanks for your response. I will give a try with akka also, if you have
> any sample code or useful link please do share with me. Anyway I am sharing
> one sample code of mine.
>
> Sample Code:
>
> @Autowiredprivate SpringBean springBean;
> public void test() throws Exception {
> SparkConf conf = new SparkConf().setAppName("APP").setMaster(masterURL);
> conf.set("spark.serializer", 
> "de.paraplu.springspark.serialization.SpringAwareSerializer");
>sc = new JavaSparkContext(conf);
>
> sc.parallelize(list).foreach(new VoidFunction() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public void call(String t) throws Exception {
> springBean.someAPI(t); // here we will have db transaction as 
> well.
> }
> });}
>
> Thanks,
> Netai
>
> On Sat, Nov 14, 2015 at 10:40 PM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> You could try to use akka actor system with apache spark, if you are
>> intending to use it in online / interactive job execution scenario.
>>
>> On Sat, Nov 14, 2015, 08:19 Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> You are probably trying to access the spring context from the executors
>>> after initializing it at the driver. And running into serialization issues.
>>>
>>> You could instead use mapPartitions() and initialize the spring context
>>> from within that.
>>>
>>> That said I don't think that will solve all of your issues because you
>>> won't be able to use the other rich transformations in Spark.
>>>
>>> I am afraid these two don't gel that well, unless and otherwise all your
>>> context lookups for beans happen in the driver.
>>>
>>> Regards
>>> Sab
>>> On 13-Nov-2015 4:17 pm, "Netai Biswas" <mail2efo...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am facing issue while integrating spark with spring.
>>>>
>>>> I am getting "java.lang.IllegalStateException: Cannot deserialize
>>>> BeanFactory with id" errors for all beans. I have tried few solutions
>>>> available in web. Please help me out to solve this issue.
>>>>
>>>> Few details:
>>>>
>>>> Java : 8
>>>> Spark : 1.5.1
>>>> Spring : 3.2.9.RELEASE
>>>>
>>>> Please let me know if you need more information or any sample code.
>>>>
>>>> Thanks,
>>>> Netai
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>


Re: Spark and Spring Integrations

2015-11-14 Thread Muthu Jayakumar
You could try to use akka actor system with apache spark, if you are
intending to use it in online / interactive job execution scenario.

On Sat, Nov 14, 2015, 08:19 Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> You are probably trying to access the spring context from the executors
> after initializing it at the driver. And running into serialization issues.
>
> You could instead use mapPartitions() and initialize the spring context
> from within that.
>
> That said I don't think that will solve all of your issues because you
> won't be able to use the other rich transformations in Spark.
>
> I am afraid these two don't gel that well, unless and otherwise all your
> context lookups for beans happen in the driver.
>
> Regards
> Sab
> On 13-Nov-2015 4:17 pm, "Netai Biswas"  wrote:
>
>> Hi,
>>
>> I am facing issue while integrating spark with spring.
>>
>> I am getting "java.lang.IllegalStateException: Cannot deserialize
>> BeanFactory with id" errors for all beans. I have tried few solutions
>> available in web. Please help me out to solve this issue.
>>
>> Few details:
>>
>> Java : 8
>> Spark : 1.5.1
>> Spring : 3.2.9.RELEASE
>>
>> Please let me know if you need more information or any sample code.
>>
>> Thanks,
>> Netai
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>