Re: Spark DataSets and multiple write(.) calls

2018-11-20 Thread Michael Shtelma
You can also cache the data frame on disk, if it does not fit into memory.
An alternative would be to write out data frame as parquet and then read
it, you can check if in this case the whole pipeline works faster as with
the standard cache.

Best,
Michael


On Tue, Nov 20, 2018 at 9:14 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi!
>
> Thanks Vadim for your answer. But this would be like caching the dataset,
> right? Or is checkpointing faster then persisting to memory or disk?
>
> I attach a pdf of my dataflow program. If I could compute the output of
> outputs 1-5 in parallel the output of flatmap1 and groupBy could be reused,
> avoiding to write to disk (at least until the grouping).
>
> Any other ideas or proposals?
>
> Best,
>
> Rico.
>
> Am 19.11.2018 um 19:12 schrieb Vadim Semenov:
>
> You can use checkpointing, in this case Spark will write out an rdd to
> whatever destination you specify, and then the RDD can be reused from the
> checkpointed state avoiding recomputing.
>
> On Mon, Nov 19, 2018 at 7:51 AM Dipl.-Inf. Rico Bergmann <
> i...@ricobergmann.de> wrote:
>
>> Thanks for your advise. But I'm using Batch processing. Does anyone have
>> a solution for the batch processing case?
>>
>> Best,
>>
>> Rico.
>>
>> Am 19.11.2018 um 09:43 schrieb Magnus Nilsson:
>>
>>
>> Magnus Nilsson
>> 9:43 AM (0 minutes ago)
>>
>> to info
>> I had the same requirements. As far as I know the only way is to extend
>> the foreachwriter, cache the microbatch result and write to each output.
>>
>> https://docs.databricks.com/spark/latest/structured-streaming/foreach.html
>>
>> Unfortunately it seems as if you have to make a new connection "per
>> batch" instead of creating one long lasting connections for the pipeline as
>> such. Ie you might have to implement some sort of connection pooling by
>> yourself depending on sink.
>>
>> Regards,
>>
>> Magnus
>>
>>
>> On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann <
>> i...@ricobergmann.de> wrote:
>>
>>> Hi!
>>>
>>> I have a SparkSQL programm, having one input and 6 ouputs (write). When
>>> executing this programm every call to write(.) executes the plan. My
>>> problem is, that I want all these writes to happen in parallel (inside
>>> one execution plan), because all writes have a common and compute
>>> intensive subpart, that can be shared by all plans. Is there a
>>> possibility to do this? (Caching is not a solution because the input
>>> dataset is way to large...)
>>>
>>> Hoping for advises ...
>>>
>>> Best, Rico B.
>>>
>>>
>>> ---
>>> Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
>>> https://www.avast.com/antivirus
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> 
>>  Virenfrei.
>> www.avast.com
>> 
>> <#m_-1099009014531121604_m_-7118895712672043959_m_6471921890789606388_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
> --
> Sent from my iPhone
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Read Avro Data using Spark Streaming

2018-11-14 Thread Michael Shtelma
Hi,

you can use this project in order to read Avro using Spark Structured
Streaming:
https://github.com/AbsaOSS/ABRiS

Spark 2.4 has also built in support for Avro, so you can use from_avro
function in Spark 2.4.

Best,
Michael


On Sat, Nov 3, 2018 at 4:34 AM Divya Narayan 
wrote:

> Hi,
>
> I produced avro data to kafka topic using schema registry and now I want
> to use spark streaming to read that data and do some computation in real
> time. Can some one please give a sample code for doing that . I couldn't
> find any working code online. I am using spark version 2.2.0 and
> spark-streaming-kafka-0-10_2.11.
>
> Thanks
> Divya
>


Re: How to increase the parallelism of Spark Streaming application?

2018-11-07 Thread Michael Shtelma
If you configure to many Kafka partitions, you can run into memory issues.
This will increase memory requirements for spark job a lot.

Best,
Michael


On Wed, Nov 7, 2018 at 8:28 AM JF Chen  wrote:

> I have a Spark Streaming application which reads data from kafka and save
> the the transformation result to hdfs.
> My original partition number of kafka topic is 8, and repartition the data
> to 100 to increase the parallelism of spark job.
> Now I am wondering if I increase the kafka partition number to 100 instead
> of setting repartition to 100, will the performance be enhanced? (I know
> repartition action cost a lot cpu resource)
> If I set the kafka partition number to 100, does it have any negative
> efficiency?
> I just have one production environment so it's not convenient for me to do
> the test
>
> Thanks!
>
> Regard,
> Junfeng Chen
>


Re: [Arrow][Dremio]

2018-05-14 Thread Michael Shtelma
Hi Xavier,

Dremio is looking really interesting and has nice UI. I think the idea to
replace SSIS or similar tools with Dremio is not so bad, but what about
complex scenarios with a lot of code and transformations ?
Is it possible to use Dremio via API and define own transformations and
transformation workflows with Java or Scala in Dremio?
I am not sure, if it is supported at all.
I think Dremio guys are looking forward to give users access to Sabot API
in order to use Dremio in the same way you can use spark, but I am not sure
if it is possible now.
Have you also tried comparing performance with Spark ? Are there any
benchmarks ?

Best,
Michael

On Mon, May 14, 2018 at 6:53 AM, xmehaut  wrote:

> Hello,
> I've some question about Spark and Apache Arrow. Up to now, Arrow is only
> used for sharing data between Python and Spark executors instead of
> transmitting them through sockets. I'm studying currently Dremio as an
> interesting way to access multiple sources of data, and as a potential
> replacement of ETL tools, included sparksql.
> It seems, if the promises are actually right, that arrow and dremio may be
> changing game for these two purposes (data source abstraction, etl tasks),
> leaving then spark on te two following goals , ie ml/dl and graph
> processing, which can be a danger for spark at middle term with the arising
> of multiple frameworks in these areas.
> My question is then :
> - is there a means to use arrow more broadly in spark itself and not only
> for sharing data?
> - what are the strenghts and weaknesses of spark wrt Arrow and consequently
> Dremio?
> - What is the difference finally between databricks DBIO and Dremio/arrow?
> -How do you see the future of spark regarding these assumptions?
> regards
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


INSERT INTO TABLE_PARAMS fails during ANALYZE TABLE

2018-04-19 Thread Michael Shtelma
Hi everybody,

I wanted to test CBO with enabled histograms.
In order to do this, I  have enabled property
spark.sql.statistics.histogram.enabled.
In this test derby was used as a database for hive metastore.

The problem is, that in some cases, the values, that are inserted to table
TABLE_PARAMS exceed the maximum length of 4000 symbols:

org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table.
Put request failed : INSERT INTO TABLE_PARAMS
(PARAM_VALUE,TBL_ID,PARAM_KEY) VALUES (?,?,?)

org.datanucleus.exceptions.NucleusDataStoreException: Put request failed :
INSERT INTO TABLE_PARAMS (PARAM_VALUE,TBL_ID,PARAM_KEY) VALUES (?,?,?)

and then

Caused by: java.sql.SQLDataException: A truncation error was encountered
trying to shrink VARCHAR
'TFo0QmxvY2smMQwAAOAXAABMl6MI8TlBBw+MWLFixgAAAP7Bn9+7oD1wpMEv&' to length
4000.
The detailed stack trace can be seen here:

https://gist.github.com/mshtelma/c5ee8206200533fc1d606964dd5a30e2

Is it a known issue ?

Best,
Michael


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-28 Thread Michael Shtelma
Hi,

this property will be used in YARN mode only by the driver.
Executors will use the properties coming from YARN for storing temporary
files.


Best,
Michael

On Wed, Mar 28, 2018 at 7:37 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
>
> As per documentation in: https://spark.apache.org/
> docs/latest/configuration.html
>
>
> spark.local.dir /tmp Directory to use for "scratch" space in Spark,
> including map output files and RDDs that get stored on disk. This should be
> on a fast, local disk in your system. It can also be a comma-separated list
> of multiple directories on different disks. NOTE: In Spark 1.0 and later
> this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
> LOCAL_DIRS (YARN) environment variables set by the cluster manager.
>
> Regards,
> Gourav Sengupta
>
>
>
>
>
> On Mon, Mar 26, 2018 at 8:28 PM, Michael Shtelma <mshte...@gmail.com>
> wrote:
>
>> Hi Keith,
>>
>> Thanks  for the suggestion!
>> I have solved this already.
>> The problem was, that the yarn process was not responding to
>> start/stop commands and has not applied my configuration changes.
>> I have killed it and restarted my cluster, and after that yarn has
>> started using yarn.nodemanager.local-dirs parameter defined in
>> yarn-site.xml.
>> After this change, -Djava.io.tmpdir for the spark executor was set
>> correctly,  according to yarn.nodemanager.local-dirs parameter.
>>
>> Best,
>> Michael
>>
>>
>> On Mon, Mar 26, 2018 at 9:15 PM, Keith Chapman <keithgchap...@gmail.com>
>> wrote:
>> > Hi Michael,
>> >
>> > sorry for the late reply. I guess you may have to set it through the
>> hdfs
>> > core-site.xml file. The property you need to set is "hadoop.tmp.dir"
>> which
>> > defaults to "/tmp/hadoop-${user.name}"
>> >
>> > Regards,
>> > Keith.
>> >
>> > http://keith-chapman.com
>> >
>> > On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma <mshte...@gmail.com>
>> wrote:
>> >>
>> >> Hi Keith,
>> >>
>> >> Thank you for the idea!
>> >> I have tried it, so now the executor command is looking in the
>> following
>> >> way :
>> >>
>> >> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
>> >> '-Djava.io.tmpdir=my_prefered_path'
>> >>
>> >> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/msh/
>> appcache/application_1521110306769_0041/container_1521110306
>> 769_0041_01_04/tmp
>> >>
>> >> JVM is using the second Djava.io.tmpdir parameter and writing
>> >> everything to the same directory as before.
>> >>
>> >> Best,
>> >> Michael
>> >> Sincerely,
>> >> Michael Shtelma
>> >>
>> >>
>> >> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman <
>> keithgchap...@gmail.com>
>> >> wrote:
>> >> > Can you try setting spark.executor.extraJavaOptions to have
>> >> > -Djava.io.tmpdir=someValue
>> >> >
>> >> > Regards,
>> >> > Keith.
>> >> >
>> >> > http://keith-chapman.com
>> >> >
>> >> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <
>> mshte...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> Hi Keith,
>> >> >>
>> >> >> Thank you for your answer!
>> >> >> I have done this, and it is working for spark driver.
>> >> >> I would like to make something like this for the executors as well,
>> so
>> >> >> that the setting will be used on all the nodes, where I have
>> executors
>> >> >> running.
>> >> >>
>> >> >> Best,
>> >> >> Michael
>> >> >>
>> >> >>
>> >> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman
>> >> >> <keithgchap...@gmail.com>
>> >> >> wrote:
>> >> >> > Hi Michael,
>> >> >> >
>> >> >> > You could either set spark.local.dir through spark conf or
>> >> >> > java.io.tmpdir
>> >> >> > system property.
>> >> >> >
>> >> >> > Regards

Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-27 Thread Michael Shtelma
Hi,

the Jira Bug is here: https://issues.apache.org/jira/browse/SPARK-23799
I have also created the PR for the issue:
https://github.com/apache/spark/pull/20913
With this fix, it is working for me really well.

Best,
Michael


On Sat, Mar 24, 2018 at 12:39 AM, Takeshi Yamamuro
<linguin@gmail.com> wrote:
> Can you file a jira if this is a bug?
> Thanks!
>
> On Sat, Mar 24, 2018 at 1:23 AM, Michael Shtelma <mshte...@gmail.com> wrote:
>>
>> Hi Maropu,
>>
>> the problem seems to be in FilterEstimation.scala on lines 50 and 52:
>>
>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala?utf8=✓#L50-L52
>>
>> val filterSelectivity =
>> calculateFilterSelectivity(plan.condition).getOrElse(1.0)
>> val filteredRowCount: BigInt =
>> ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
>>
>> The problem is, that filterSelectivity gets NaN value in my case and
>> NaN cannot be converted to BigDecimal.
>> I can try adding simple if, checking the NaN value and test if this helps.
>> I will also try to understand, why in my case, I am getting NaN.
>>
>> Best,
>> Michael
>>
>>
>> On Fri, Mar 23, 2018 at 1:51 PM, Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>> > hi,
>> >
>> > What's a query to reproduce this?
>> > It seems when casting double to BigDecimal, it throws the exception.
>> >
>> > // maropu
>> >
>> > On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma <mshte...@gmail.com>
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I am using Spark 2.3 with activated cost-based optimizer and a couple
>> >> of hive tables, that were analyzed previously.
>> >>
>> >> I am getting the following exception for different queries:
>> >>
>> >> java.lang.NumberFormatException
>> >>
>> >> at java.math.BigDecimal.(BigDecimal.java:494)
>> >>
>> >> at java.math.BigDecimal.(BigDecimal.java:824)
>> >>
>> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
>> >>
>> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
>> >>
>> >> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation.estimate(FilterEstimation.scala:52)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:30)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>> >>
>> >> at scala.Option.getOrElse(Option.scala:121)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>> >>
>> >> at
>> >>
>> >> scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOp

Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-26 Thread Michael Shtelma
Hi Keith,

Thanks  for the suggestion!
I have solved this already.
The problem was, that the yarn process was not responding to
start/stop commands and has not applied my configuration changes.
I have killed it and restarted my cluster, and after that yarn has
started using yarn.nodemanager.local-dirs parameter defined in
yarn-site.xml.
After this change, -Djava.io.tmpdir for the spark executor was set
correctly,  according to yarn.nodemanager.local-dirs parameter.

Best,
Michael


On Mon, Mar 26, 2018 at 9:15 PM, Keith Chapman <keithgchap...@gmail.com> wrote:
> Hi Michael,
>
> sorry for the late reply. I guess you may have to set it through the hdfs
> core-site.xml file. The property you need to set is "hadoop.tmp.dir" which
> defaults to "/tmp/hadoop-${user.name}"
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma <mshte...@gmail.com> wrote:
>>
>> Hi Keith,
>>
>> Thank you for the idea!
>> I have tried it, so now the executor command is looking in the following
>> way :
>>
>> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
>> '-Djava.io.tmpdir=my_prefered_path'
>>
>> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_1521110306769_0041/container_1521110306769_0041_01_04/tmp
>>
>> JVM is using the second Djava.io.tmpdir parameter and writing
>> everything to the same directory as before.
>>
>> Best,
>> Michael
>> Sincerely,
>> Michael Shtelma
>>
>>
>> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman <keithgchap...@gmail.com>
>> wrote:
>> > Can you try setting spark.executor.extraJavaOptions to have
>> > -Djava.io.tmpdir=someValue
>> >
>> > Regards,
>> > Keith.
>> >
>> > http://keith-chapman.com
>> >
>> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <mshte...@gmail.com>
>> > wrote:
>> >>
>> >> Hi Keith,
>> >>
>> >> Thank you for your answer!
>> >> I have done this, and it is working for spark driver.
>> >> I would like to make something like this for the executors as well, so
>> >> that the setting will be used on all the nodes, where I have executors
>> >> running.
>> >>
>> >> Best,
>> >> Michael
>> >>
>> >>
>> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman
>> >> <keithgchap...@gmail.com>
>> >> wrote:
>> >> > Hi Michael,
>> >> >
>> >> > You could either set spark.local.dir through spark conf or
>> >> > java.io.tmpdir
>> >> > system property.
>> >> >
>> >> > Regards,
>> >> > Keith.
>> >> >
>> >> > http://keith-chapman.com
>> >> >
>> >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> Hi everybody,
>> >> >>
>> >> >> I am running spark job on yarn, and my problem is that the
>> >> >> blockmgr-*
>> >> >> folders are being created under
>> >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
>> >> >> The size of this folder can grow to a significant size and does not
>> >> >> really fit into /tmp file system for one job, which makes a real
>> >> >> problem for my installation.
>> >> >> I have redefined hadoop.tmp.dir in core-site.xml and
>> >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
>> >> >> location and expected that the block manager will create the files
>> >> >> there and not under /tmp, but this is not the case. The files are
>> >> >> created under /tmp.
>> >> >>
>> >> >> I am wondering if there is a way to make spark not use /tmp at all
>> >> >> and
>> >> >> configure it to create all the files somewhere else ?
>> >> >>
>> >> >> Any assistance would be greatly appreciated!
>> >> >>
>> >> >> Best,
>> >> >> Michael
>> >> >>
>> >> >>
>> >> >> -
>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >>
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Michael Shtelma
Hi Maropu,

the problem seems to be in FilterEstimation.scala on lines 50 and 52:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala?utf8=✓#L50-L52

val filterSelectivity =
calculateFilterSelectivity(plan.condition).getOrElse(1.0)
val filteredRowCount: BigInt =
ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)

The problem is, that filterSelectivity gets NaN value in my case and
NaN cannot be converted to BigDecimal.
I can try adding simple if, checking the NaN value and test if this helps.
I will also try to understand, why in my case, I am getting NaN.

Best,
Michael


On Fri, Mar 23, 2018 at 1:51 PM, Takeshi Yamamuro <linguin@gmail.com> wrote:
> hi,
>
> What's a query to reproduce this?
> It seems when casting double to BigDecimal, it throws the exception.
>
> // maropu
>
> On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma <mshte...@gmail.com> wrote:
>>
>> Hi all,
>>
>> I am using Spark 2.3 with activated cost-based optimizer and a couple
>> of hive tables, that were analyzed previously.
>>
>> I am getting the following exception for different queries:
>>
>> java.lang.NumberFormatException
>>
>> at java.math.BigDecimal.(BigDecimal.java:494)
>>
>> at java.math.BigDecimal.(BigDecimal.java:824)
>>
>> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
>>
>> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
>>
>> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation.estimate(FilterEstimation.scala:52)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:30)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>>
>> at scala.Option.getOrElse(Option.scala:121)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)
>>
>> at
>> scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43)
>>
>> at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ProjectEstimation$.estimate(ProjectEstimation.scala:27)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:37)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
>>
>> at
>> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
>>
>> at scala.Option.getOrElse(Option.s

Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Michael Shtelma
Hi all,

I am using Spark 2.3 with activated cost-based optimizer and a couple
of hive tables, that were analyzed previously.

I am getting the following exception for different queries:

java.lang.NumberFormatException

at java.math.BigDecimal.(BigDecimal.java:494)

at java.math.BigDecimal.(BigDecimal.java:824)

at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)

at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)

at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation.estimate(FilterEstimation.scala:52)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:30)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)

at 
scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(IndexedSeqOptimized.scala:38)

at 
scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.scala:43)

at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ProjectEstimation$.estimate(ProjectEstimation.scala:27)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.visit(LogicalPlanVisitor.scala:37)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)

at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.stats(LogicalPlan.scala:30)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$2.apply(CostBasedJoinReorder.scala:64)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$2.apply(CostBasedJoinReorder.scala:64)

at scala.collection.LinearSeqOptimized$class.forall(LinearSeqOptimized.scala:83)

at scala.collection.immutable.List.forall(List.scala:84)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$.org$apache$spark$sql$catalyst$optimizer$CostBasedJoinReorder$$reorder(CostBasedJoinReorder.scala:64)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$1.applyOrElse(CostBasedJoinReorder.scala:46)

at 
org.apache.spark.sql.catalyst.optimizer.CostBasedJoinReorder$$anonfun$1.applyOrElse(CostBasedJoinReorder.scala:43)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)

at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)

at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)

at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)

at 

Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-19 Thread Michael Shtelma
Hi Keith,

Thank you for the idea!
I have tried it, so now the executor command is looking in the following way :

/bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
'-Djava.io.tmpdir=my_prefered_path'
-Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_1521110306769_0041/container_1521110306769_0041_01_04/tmp

JVM is using the second Djava.io.tmpdir parameter and writing
everything to the same directory as before.

Best,
Michael
Sincerely,
Michael Shtelma


On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman <keithgchap...@gmail.com> wrote:
> Can you try setting spark.executor.extraJavaOptions to have
> -Djava.io.tmpdir=someValue
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <mshte...@gmail.com>
> wrote:
>>
>> Hi Keith,
>>
>> Thank you for your answer!
>> I have done this, and it is working for spark driver.
>> I would like to make something like this for the executors as well, so
>> that the setting will be used on all the nodes, where I have executors
>> running.
>>
>> Best,
>> Michael
>>
>>
>> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman <keithgchap...@gmail.com>
>> wrote:
>> > Hi Michael,
>> >
>> > You could either set spark.local.dir through spark conf or
>> > java.io.tmpdir
>> > system property.
>> >
>> > Regards,
>> > Keith.
>> >
>> > http://keith-chapman.com
>> >
>> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com>
>> > wrote:
>> >>
>> >> Hi everybody,
>> >>
>> >> I am running spark job on yarn, and my problem is that the blockmgr-*
>> >> folders are being created under
>> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
>> >> The size of this folder can grow to a significant size and does not
>> >> really fit into /tmp file system for one job, which makes a real
>> >> problem for my installation.
>> >> I have redefined hadoop.tmp.dir in core-site.xml and
>> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
>> >> location and expected that the block manager will create the files
>> >> there and not under /tmp, but this is not the case. The files are
>> >> created under /tmp.
>> >>
>> >> I am wondering if there is a way to make spark not use /tmp at all and
>> >> configure it to create all the files somewhere else ?
>> >>
>> >> Any assistance would be greatly appreciated!
>> >>
>> >> Best,
>> >> Michael
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-19 Thread Michael Shtelma
Hi Keith,

Thank you for your answer!
I have done this, and it is working for spark driver.
I would like to make something like this for the executors as well, so
that the setting will be used on all the nodes, where I have executors
running.

Best,
Michael


On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman <keithgchap...@gmail.com> wrote:
> Hi Michael,
>
> You could either set spark.local.dir through spark conf or java.io.tmpdir
> system property.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com> wrote:
>>
>> Hi everybody,
>>
>> I am running spark job on yarn, and my problem is that the blockmgr-*
>> folders are being created under
>> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
>> The size of this folder can grow to a significant size and does not
>> really fit into /tmp file system for one job, which makes a real
>> problem for my installation.
>> I have redefined hadoop.tmp.dir in core-site.xml and
>> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
>> location and expected that the block manager will create the files
>> there and not under /tmp, but this is not the case. The files are
>> created under /tmp.
>>
>> I am wondering if there is a way to make spark not use /tmp at all and
>> configure it to create all the files somewhere else ?
>>
>> Any assistance would be greatly appreciated!
>>
>> Best,
>> Michael
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-19 Thread Michael Shtelma
Hi everybody,

I am running spark job on yarn, and my problem is that the blockmgr-*
folders are being created under
/tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
The size of this folder can grow to a significant size and does not
really fit into /tmp file system for one job, which makes a real
problem for my installation.
I have redefined hadoop.tmp.dir in core-site.xml and
yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
location and expected that the block manager will create the files
there and not under /tmp, but this is not the case. The files are
created under /tmp.

I am wondering if there is a way to make spark not use /tmp at all and
configure it to create all the files somewhere else ?

Any assistance would be greatly appreciated!

Best,
Michael

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark.sql call takes far too long

2018-01-24 Thread Michael Shtelma
Hi all,

I have a problem with the performance of the sparkSession.sql call. It
takes up to a couple of seconds for me right now. I have a lot of
generated temporary tables, which are registered within the session
and also a lot of temporary data frames. Is it possible, that the
analysis/resolve/analysis phases take far too long? Is there a way to
figure out, what exactly takes too long?

Does anybody have any ideas on this?
Any assistance would be greatly appreciated!

Thanks,
Michael

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Inner join with the table itself

2018-01-16 Thread Michael Shtelma
Hi Jacek,

Thank you for the workaround.
It is really working in this way:
pos.as("p1").join(pos.as("p2")).filter($"p1.POSITION_ID0"===$"p2.POSITION_ID")
I have checked, that in this way I get the same execution plan as for
the join with renamed columns.

Best,
Michael


On Mon, Jan 15, 2018 at 10:33 PM, Jacek Laskowski <ja...@japila.pl> wrote:
> Hi Michael,
>
> scala> spark.version
> res0: String = 2.4.0-SNAPSHOT
>
> scala> val r1 = spark.range(1)
> r1: org.apache.spark.sql.Dataset[Long] = [id: bigint]
>
> scala> r1.as("left").join(r1.as("right")).filter($"left.id" ===
> $"right.id").show
> +---+---+
> | id| id|
> +---+---+
> |  0|  0|
> +---+---+
>
> Am I missing something? When aliasing a table, use the identifier in column
> refs (inside).
>
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
> On Mon, Jan 15, 2018 at 3:26 PM, Michael Shtelma <mshte...@gmail.com> wrote:
>>
>> Hi Jacek & Gengliang,
>>
>> let's take a look at the following query:
>>
>> val pos = spark.read.parquet(prefix + "POSITION.parquet")
>> pos.createOrReplaceTempView("POSITION")
>> spark.sql("SELECT  POSITION.POSITION_ID  FROM POSITION POSITION JOIN
>> POSITION POSITION1 ON POSITION.POSITION_ID0 = POSITION1.POSITION_ID
>> ").collect()
>>
>> This query is working for me right now using spark 2.2.
>>
>> Now we can try implementing the same logic with DataFrame API:
>>
>> pos.join(pos, pos("POSITION_ID0")===pos("POSITION_ID")).collect()
>>
>> I am getting the following error:
>>
>> "Join condition is missing or trivial.
>>
>> Use the CROSS JOIN syntax to allow cartesian products between these
>> relations.;"
>>
>> I have tried using alias function, but without success:
>>
>> val pos2 = pos.alias("P2")
>> pos.join(pos2, pos("POSITION_ID0")===pos2("POSITION_ID")).collect()
>>
>> This also leads us to the same error.
>> Am  I missing smth about the usage of alias?
>>
>> Now let's rename the columns:
>>
>> val pos3 = pos.toDF(pos.columns.map(_ + "_2"): _*)
>> pos.join(pos3, pos("POSITION_ID0")===pos3("POSITION_ID_2")).collect()
>>
>> It works!
>>
>> There is one more really odd thing about all this: a colleague of mine
>> has managed to get the same exception ("Join condition is missing or
>> trivial") also using original SQL query, but I think he has been using
>> empty tables.
>>
>> Thanks,
>> Michael
>>
>>
>> On Mon, Jan 15, 2018 at 11:27 AM, Gengliang Wang
>> <gengliang.w...@databricks.com> wrote:
>> > Hi Michael,
>> >
>> > You can use `Explain` to see how your query is optimized.
>> >
>> > https://docs.databricks.com/spark/latest/spark-sql/language-manual/explain.html
>> > I believe your query is an actual cross join, which is usually very slow
>> > in
>> > execution.
>> >
>> > To get rid of this, you can set `spark.sql.crossJoin.enabled` as true.
>> >
>> >
>> > 在 2018年1月15日,下午6:09,Jacek Laskowski <ja...@japila.pl> 写道:
>> >
>> > Hi Michael,
>> >
>> > -dev +user
>> >
>> > What's the query? How do you "fool spark"?
>> >
>> > Pozdrawiam,
>> > Jacek Laskowski
>> > 
>> > https://about.me/JacekLaskowski
>> > Mastering Spark SQL https://bit.ly/mastering-spark-sql
>> > Spark Structured Streaming https://bit.ly/spark-structured-streaming
>> > Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
>> > Follow me at https://twitter.com/jaceklaskowski
>> >
>> > On Mon, Jan 15, 2018 at 10:23 AM, Michael Shtelma <mshte...@gmail.com>
>> > wrote:
>> >>
>> >> Hi all,
>> >>
>> >> If I try joining the table with itself using join columns, I am
>> >> getting the following error:
>> >> "Join condition is missing or trivial. Use the CROSS JOIN syntax to
>> >> allow cartesian products between these relations.;"
>> >>
>> >> This is not true, and my join is not trivial and is not a real cross
>> >> join. I am providing join condition and expect to get maybe a couple
>> >> of joined rows for each row in the original table.
>> >>
>> >> There is a workaround for this, which implies renaming all the columns
>> >> in source data frame and only afterwards proceed with the join. This
>> >> allows us to fool spark.
>> >>
>> >> Now I am wondering if there is a way to get rid of this problem in a
>> >> better way? I do not like the idea of renaming the columns because
>> >> this makes it really difficult to keep track of the names in the
>> >> columns in result data frames.
>> >> Is it possible to deactivate this check?
>> >>
>> >> Thanks,
>> >> Michael
>> >>
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Inner join with the table itself

2018-01-15 Thread Michael Shtelma
Hi Jacek & Gengliang,

let's take a look at the following query:

val pos = spark.read.parquet(prefix + "POSITION.parquet")
pos.createOrReplaceTempView("POSITION")
spark.sql("SELECT  POSITION.POSITION_ID  FROM POSITION POSITION JOIN
POSITION POSITION1 ON POSITION.POSITION_ID0 = POSITION1.POSITION_ID
").collect()

This query is working for me right now using spark 2.2.

Now we can try implementing the same logic with DataFrame API:

pos.join(pos, pos("POSITION_ID0")===pos("POSITION_ID")).collect()

I am getting the following error:

"Join condition is missing or trivial.

Use the CROSS JOIN syntax to allow cartesian products between these relations.;"

I have tried using alias function, but without success:

val pos2 = pos.alias("P2")
pos.join(pos2, pos("POSITION_ID0")===pos2("POSITION_ID")).collect()

This also leads us to the same error.
Am  I missing smth about the usage of alias?

Now let's rename the columns:

val pos3 = pos.toDF(pos.columns.map(_ + "_2"): _*)
pos.join(pos3, pos("POSITION_ID0")===pos3("POSITION_ID_2")).collect()

It works!

There is one more really odd thing about all this: a colleague of mine
has managed to get the same exception ("Join condition is missing or
trivial") also using original SQL query, but I think he has been using
empty tables.

Thanks,
Michael


On Mon, Jan 15, 2018 at 11:27 AM, Gengliang Wang
<gengliang.w...@databricks.com> wrote:
> Hi Michael,
>
> You can use `Explain` to see how your query is optimized.
> https://docs.databricks.com/spark/latest/spark-sql/language-manual/explain.html
> I believe your query is an actual cross join, which is usually very slow in
> execution.
>
> To get rid of this, you can set `spark.sql.crossJoin.enabled` as true.
>
>
> 在 2018年1月15日,下午6:09,Jacek Laskowski <ja...@japila.pl> 写道:
>
> Hi Michael,
>
> -dev +user
>
> What's the query? How do you "fool spark"?
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
> On Mon, Jan 15, 2018 at 10:23 AM, Michael Shtelma <mshte...@gmail.com>
> wrote:
>>
>> Hi all,
>>
>> If I try joining the table with itself using join columns, I am
>> getting the following error:
>> "Join condition is missing or trivial. Use the CROSS JOIN syntax to
>> allow cartesian products between these relations.;"
>>
>> This is not true, and my join is not trivial and is not a real cross
>> join. I am providing join condition and expect to get maybe a couple
>> of joined rows for each row in the original table.
>>
>> There is a workaround for this, which implies renaming all the columns
>> in source data frame and only afterwards proceed with the join. This
>> allows us to fool spark.
>>
>> Now I am wondering if there is a way to get rid of this problem in a
>> better way? I do not like the idea of renaming the columns because
>> this makes it really difficult to keep track of the names in the
>> columns in result data frames.
>> Is it possible to deactivate this check?
>>
>> Thanks,
>> Michael
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using UDF compiled with Janino in Spark

2017-12-15 Thread Michael Shtelma
Hi all,

I am trying to compile my udf with janino copmpiler and then register
it in spark and use it afterwards. Here is the code:

String s = " \n" +
"public class MyUDF implements
org.apache.spark.sql.api.java.UDF1 {\n" +
"@Override\n" +
"public String call(String s) throws Exception {\n" +
"return s+\"dd\";\n" +
"}\n" +
"};";


ISimpleCompiler sc =
CompilerFactoryFactory.getDefaultCompilerFactory().newSimpleCompiler();
sc.cook(s);
UDF1 udf1 = (UDF1) sc.getClassLoader().loadClass("MyUDF").newInstance();

sparkSession.udf().register("MyUDF", udf1, DataTypes.StringType);

sparkSession.sql("select MyUDF(id) from deal").show();

The problem is, that during the execution I am getting the following exception:

15.12.2017 14:32:46 [ERROR]: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of
type scala.collection.Seq in instance of
org.apache.spark.rdd.MapPartitionsRDD
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15.12.2017 14:32:46 [ WARN]: Lost task 0.0 in stage 5.0 (TID 5,
localhost, executor driver): java.lang.ClassCastException: cannot
assign instance of scala.collection.immutable.List$SerializationProxy
to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_
of type scala.collection.Seq in instance of
org.apache.spark.rdd.MapPartitionsRDD
at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Any ideas what could go wrong?

Best,
Michael

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org