Re: [spark.local.dir] comma separated list does not work

2024-01-12 Thread Koert Kuipers
try it without spaces?
export SPARK_LOCAL_DIRS="/tmp,/share/"

On Fri, Jan 12, 2024 at 5:00 PM Andrew Petersen 
wrote:

> Hello Spark community
>
> SPARK_LOCAL_DIRS or
> spark.local.dir
> is supposed to accept a list.
>
> I want to list one local (fast) drive, followed by a gpfs network drive,
> similar to what is done here:
>
> https://cug.org/proceedings/cug2016_proceedings/includes/files/pap129s2-file1.pdf
> "Thus it is preferable to bias the data towards faster storage by
> including multiple directories on the faster devices (e.g., SPARK LOCAL
> DIRS=/tmp/spark1, /tmp/spark2, /tmp/spark3, /lus/scratch/sparkscratch/)."
> The purpose of this is to get both benefits of speed and avoiding "out of
> space" errors.
>
> However, for me, Spark is only considering the 1st directory on the list:
> export SPARK_LOCAL_DIRS="/tmp, /share/"
>
> I am using Spark 3.4.1. Does anyone have any experience getting this to
> work? If so can you suggest a simple example I can try and tell me which
> version of Spark you are using?
>
> Regards
> Andrew
>
>
>
>
> I am trying to use 2 local drives
>
> --
> Andrew Petersen, PhD
> Advanced Computing, Office of Information Technology
> 2620 Hillsborough Street
> datascience.oit.ncsu.edu
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)

2023-12-13 Thread Koert Kuipers
yes it does using IAM roles for service accounts.
see:
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html

i wrote a little bit about this also here:
https://technotes.tresata.com/spark-on-k8s/

On Wed, Dec 13, 2023 at 7:52 AM Atul Patil  wrote:

> Hello Team,
>
>
>
> Does Spark support role-based authentication and access to Amazon S3 for
> Kubernetes deployment?
>
> *Note: we have deployed our spark application in the Kubernetes cluster.*
>
>
>
> Below are the Hadoop-AWS dependencies we are using:
>
> 
>org.apache.hadoop
>hadoop-aws
>3.3.4
> 
>
>
>
> We are using the following configuration when creating the spark session,
> but it is not working::
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.aws.credentials.provider",
> "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider");
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.arn",
> System.getenv("AWS_ROLE_ARN"));
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.credentials.provider",
> "com.amazonaws.auth.WebIdentityTokenCredentialsProvider");
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint",
> "s3.eu-central-1.amazonaws.com");
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.sts.endpoint.region",
> Regions.EU_CENTRAL_1.getName());
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.web.identity.token.file",
> System.getenv("AWS_WEB_IDENTITY_TOKEN_FILE"));
>
> sparkSession.sparkContext().hadoopConfiguration().set("fs.s3a.assumed.role.session.duration",
> "30m");
>
>
>
> Thank you!
>
>
>
> Regards,
>
> Atul
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: Elasticsearch support for Spark 3.x

2023-09-01 Thread Koert Kuipers
could the provided scope be the issue?

On Sun, Aug 27, 2023 at 2:58 PM Dipayan Dev  wrote:

> Using the following dependency for Spark 3 in POM file (My Scala version
> is 2.12.14)
>
>
>
>
>
>
> *org.elasticsearch
> elasticsearch-spark-30_2.12
> 7.12.0provided*
>
>
> The code throws error at this line :
> df.write.format("es").mode("overwrite").options(elasticOptions).save("index_name")
> The same code is working with Spark 2.4.0 and the following dependency
>
>
>
>
>
> *org.elasticsearch
> elasticsearch-spark-20_2.12
> 7.12.0*
>
>
> On Mon, 28 Aug 2023 at 12:17 AM, Holden Karau 
> wrote:
>
>> What’s the version of the ES connector you are using?
>>
>> On Sat, Aug 26, 2023 at 10:17 AM Dipayan Dev 
>> wrote:
>>
>>> Hi All,
>>>
>>> We're using Spark 2.4.x to write dataframe into the Elasticsearch index.
>>> As we're upgrading to Spark 3.3.0, it throwing out error
>>> Caused by: java.lang.ClassNotFoundException: es.DefaultSource
>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>
>>> Looking at a few responses from Stackoverflow
>>> . it seems this is not yet
>>> supported by Elasticsearch-hadoop.
>>>
>>> Does anyone have experience with this? Or faced/resolved this issue in
>>> Spark 3?
>>>
>>> Thanks in advance!
>>>
>>> Regards
>>> Dipayan
>>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: [EXTERNAL] spark re-use shuffle files not happening

2022-07-16 Thread Koert Kuipers
ok thanks. guess i am simply misremembering that i saw the shuffle files
getting re-used across jobs (actions). it was probably across stages for
the same job.

in structured streaming this is a pretty big deal. if you join a streaming
dataframe with a large static dataframe each microbatch becomes a job
(action), so the large static dataframe gets reshuffled for every
microbatch. observing this performance issue was actually why i did the
little basic experiment in this post.


On Sat, Jul 16, 2022 at 12:33 PM Shay Elbaz  wrote:

> Spark can reuse shuffle stages *in the same job *(action), not cross jobs.
> --
> *From:* Koert Kuipers 
> *Sent:* Saturday, July 16, 2022 6:43 PM
> *To:* user 
> *Subject:* [EXTERNAL] spark re-use shuffle files not happening
>
>
> *ATTENTION:* This email originated from outside of GM.
>
>
> i have seen many jobs where spark re-uses shuffle files (and skips a stage
> of a job), which is an awesome feature given how expensive shuffles are,
> and i generally now assume this will happen.
>
> however i feel like i am going a little crazy today. i did the simplest
> test in spark 3.3.0, basically i run 2 jobs within same spark shell, so
> using same spark session, and broadcast join is disabled so we get
> shuffles:
> 1) job1 joins dataframe1 with dataframe0 and writes results out.
> 2) job2 joins dataframe2 with dataframe0 and writes results out.
>
> i would expect job2 to skip the stage where dataframe0 is getting
> shuffled, but its not skipping it! what am i missing?
> is shuffle re-use only enabled within same job/action? that goes against
> what i remember...
>
> code:
> $ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
> scala> val data0 = spark.read.format("csv").option("header",
> true).load("data0.csv")
> scala> val data1 = spark.read.format("csv").option("header",
> true).load("data1.csv")
> scala> val data2 = spark.read.format("csv").option("header",
> true).load("data2.csv")
> scala> data1.join(data0, "key").write.format("parquet").save("out1")
> scala> data2.join(data0, "key").write.format("parquet").save("out2") //
> should skip stage that scans csv for data0 and writes shuffle files... but
> it doesn't
>
>
>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


spark re-use shuffle files not happening

2022-07-16 Thread Koert Kuipers
i have seen many jobs where spark re-uses shuffle files (and skips a stage
of a job), which is an awesome feature given how expensive shuffles are,
and i generally now assume this will happen.

however i feel like i am going a little crazy today. i did the simplest
test in spark 3.3.0, basically i run 2 jobs within same spark shell, so
using same spark session, and broadcast join is disabled so we get
shuffles:
1) job1 joins dataframe1 with dataframe0 and writes results out.
2) job2 joins dataframe2 with dataframe0 and writes results out.

i would expect job2 to skip the stage where dataframe0 is getting shuffled,
but its not skipping it! what am i missing?
is shuffle re-use only enabled within same job/action? that goes against
what i remember...

code:
$ spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
scala> val data0 = spark.read.format("csv").option("header",
true).load("data0.csv")
scala> val data1 = spark.read.format("csv").option("header",
true).load("data1.csv")
scala> val data2 = spark.read.format("csv").option("header",
true).load("data2.csv")
scala> data1.join(data0, "key").write.format("parquet").save("out1")
scala> data2.join(data0, "key").write.format("parquet").save("out2") //
should skip stage that scans csv for data0 and writes shuffle files... but
it doesn't

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: When should we cache / persist ? After or Before Actions?

2022-04-27 Thread Koert Kuipers
we have quite a few persists statements in our codebase whenever we are
reusing a dataframe.
we noticed that it slows things down quite a bit (sometimes doubles the
runtime), while providing little benefits, since spark already re-uses the
shuffle files underlying the dataframe efficiently even if you don't do the
persist.
so at this point i am considering removing those persist statements...
not sure what other peoples experiences are on this

‪On Thu, Apr 21, 2022 at 9:41 AM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yur...@gmail.com> wrote:‬

> Hi Sean
>
> Persisting/caching is useful when you’re going to reuse dataframe. So in
> your case no persisting/caching is required. This is regarding to “when”.
>
> The “where” usually belongs to the closest point of reusing
> calculations/transformations
>
> Btw, I’m not sure if caching is useful when you have a HUGE dataframe.
> Maybe persisting will be more useful
>
> Best regards
>
> On 21 Apr 2022, at 16:24, Sean Owen  wrote:
>
> 
> You persist before actions, not after, if you want the action's outputs to
> be persistent.
> If anything swap line 2 and 3. However, there's no point in the count()
> here, and because there is already only one action following to write, no
> caching is useful in that example.
>
> On Thu, Apr 21, 2022 at 2:26 AM Sid  wrote:
>
>> Hi Folks,
>>
>> I am working on Spark Dataframe API where I am doing following thing:
>>
>> 1) df = spark.sql("some sql on huge dataset").persist()
>> 2) df1 = df.count()
>> 3) df.repartition().write.mode().parquet("")
>>
>>
>> AFAIK, persist should be used after count statement if at all it is
>> needed to be used since spark is lazily evaluated and if I call any action
>> it will recompute the above code and hence no use of persisting it before
>> action.
>>
>> Therefore, it should be something like the below that should give better
>> performance.
>> 1) df= spark.sql("some sql on huge dataset")
>> 2) df1 = df.count()
>> 3) df.persist()
>> 4) df.repartition().write.mode().parquet("")
>>
>> So please help me to understand how it should be exactly and why? If I am
>> not correct
>>
>> Thanks,
>> Sid
>>
>>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


Re: Current state of dataset api

2021-10-05 Thread Koert Kuipers
the encoder api remains a pain point due to its lack of composability.
serialization overhead is also still there i believe. i dont remember what
has happened to the predicate pushdown issues, i think they are mostly
resolved?
we tend to use dataset api on our methods/interfaces where its fitting but
then switch to dataframe for the actual work.


On Mon, Oct 4, 2021 at 6:55 AM Magnus Nilsson  wrote:

> Hi,
>
> I tried using the (typed) Dataset API about three years ago. Then
> there were limitations with predicate pushdown, overhead serialization
> and maybe more things I've forgotten. Ultimately we chose the
> Dataframe API as the sweet spot.
>
> Does anyone know of a good overview of the current state of the
> Dataset API, pros/cons as of Spark 3?
>
> Is it fully usable, do you get the advantages of a strongly typed
> dataframe? Any known limitations or drawbacks to take into account?
>
> br,
>
> Magnus
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
CONFIDENTIALITY NOTICE: This electronic communication and any files 
transmitted with it are confidential, privileged and intended solely for 
the use of the individual or entity to whom they are addressed. If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution (electronic or otherwise) or forwarding of, or the 
taking of any action in reliance on the contents of this transmission is 
strictly prohibited. Please notify the sender immediately by e-mail if you 
have received this email by mistake and delete this email from your system.


Is it necessary to print this email? If you care about the environment 
like we do, please refrain from printing emails. It helps to keep the 
environment forested and litter-free.


understanding spark shuffle file re-use better

2021-01-13 Thread Koert Kuipers
is shuffle file re-use based on identity or equality of the dataframe?

for example if run the exact same code twice to load data and do transforms
(joins, aggregations, etc.) but without re-using any actual dataframes,
will i still see skipped stages thanks to shuffle file re-use?

thanks!
koert


Re: Apache Spark 3.1 Preparation Status (Oct. 2020)

2020-10-07 Thread Koert Kuipers
it seems to me with SPARK-20202 we are no longer planning to support
hadoop2 + hive 1.2. is that correct?

so basically spark 3.1 will no longer run on say CDH 5.x or HDP2.x with
hive?

my use case is building spark 3.1 and launching on these existing clusters
that are not managed by me. e.g. i do not use the spark version provided by
cloudera.
however there are workarounds for me (using older spark version to extract
out of hive, then switch to newer spark version) so i am not too worried
about this. just making sure i understand.

thanks

On Sat, Oct 3, 2020 at 8:17 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> As of today, master branch (Apache Spark 3.1.0) resolved
> 852+ JIRA issues and 606+ issues are 3.1.0-only patches.
> According to the 3.1.0 release window, branch-3.1 will be
> created on November 1st and enters QA period.
>
> Here are some notable updates I've been monitoring.
>
> *Language*
> 01. SPARK-25075 Support Scala 2.13
>   - Since SPARK-32926, Scala 2.13 build test has
> become a part of GitHub Action jobs.
>   - After SPARK-33044, Scala 2.13 test will be
> a part of Jenkins jobs.
> 02. SPARK-29909 Drop Python 2 and Python 3.4 and 3.5
> 03. SPARK-32082 Project Zen: Improving Python usability
>   - 7 of 16 issues are resolved.
> 04. SPARK-32073 Drop R < 3.5 support
>   - This is done for Spark 3.0.1 and 3.1.0.
>
> *Dependency*
> 05. SPARK-32058 Use Apache Hadoop 3.2.0 dependency
>   - This changes the default dist. for better cloud support
> 06. SPARK-32981 Remove hive-1.2 distribution
> 07. SPARK-20202 Remove references to org.spark-project.hive
>   - This will remove Hive 1.2.1 from source code
> 08. SPARK-29250 Upgrade to Hadoop 3.2.1 (WIP)
>
> *Core*
> 09. SPARK-27495 Support Stage level resource conf and scheduling
>   - 11 of 15 issues are resolved
> 10. SPARK-25299 Use remote storage for persisting shuffle data
>   - 8 of 14 issues are resolved
>
> *Resource Manager*
> 11. SPARK-33005 Kubernetes GA preparation
>   - It is on the way and we are waiting for more feedback.
>
> *SQL*
> 12. SPARK-30648/SPARK-32346 Support filters pushdown
>   to JSON/Avro
> 13. SPARK-32948/SPARK-32958 Add Json expression optimizer
> 14. SPARK-12312 Support JDBC Kerberos w/ keytab
>   - 11 of 17 issues are resolved
> 15. SPARK-27589 DSv2 was mostly completed in 3.0
>   and added more features in 3.1 but still we missed
>   - All built-in DataSource v2 write paths are disabled
> and v1 write is used instead.
>   - Support partition pruning with subqueries
>   - Support bucketing
>
> We still have one month before the feature freeze
> and starting QA. If you are working for 3.1,
> please consider the timeline and share your schedule
> with the Apache Spark community. For the other stuff,
> we can put it into 3.2 release scheduled in June 2021.
>
> Last not but least, I want to emphasize (7) once again.
> We need to remove the forked unofficial Hive eventually.
> Please let us know your reasons if you need to build
> from Apache Spark 3.1 source code for Hive 1.2.
>
> https://github.com/apache/spark/pull/29936
>
> As I wrote in the above PR description, for old releases,
> Apache Spark 2.4(LTS) and 3.0 (~2021.12) will provide
> Hive 1.2-based distribution.
>
> Bests,
> Dongjoon.
>


Re: Spark Small file issue

2020-06-24 Thread Koert Kuipers
i second that. we have gotten bitten too many times by coalesce impacting
upstream in an unintended way that i avoid coalesce on write altogether.

i prefer to use repartition (and take the shuffle hit) before writing
(especially if you are writing out partitioned), or if possible use
adaptive query execution to avoid too many files to begin with

On Wed, Jun 24, 2020 at 9:09 AM Bobby Evans  wrote:

> First, you need to be careful with coalesce. It will impact upstream
> processing, so if you are doing a lot of computation in the last stage
> before the repartition then coalesce will make the problem worse because
> all of that computation will happen in a single thread instead of being
> spread out.
>
> My guess is that it has something to do with writing your output files.
> Writing orc and/or parquet is not cheap. It does a lot of compression and
> statistics calculations. I also am not sure why, but from what I have seen
> they do not scale very linearly with more data being put into a single
> file. You might also be doing the repartition too early.  There should be
> some statistics on the SQL page of the UI where you can look to see which
> stages took a long time it should point you in the right direction.
>
> On Tue, Jun 23, 2020 at 5:06 PM German SM 
> wrote:
>
>> Hi,
>>
>> When reducing partitions is better to use coalesce because it doesn't
>> need to shuffle the data.
>>
>> dataframe.coalesce(1)
>>
>> El mar., 23 jun. 2020 23:54, Hichki  escribió:
>>
>>> Hello Team,
>>>
>>>
>>>
>>> I am new to Spark environment. I have converted Hive query to Spark
>>> Scala.
>>> Now I am loading data and doing performance testing. Below are details on
>>> loading 3 weeks data. Cluster level small file avg size is set to 128
>>> MB.
>>>
>>>
>>>
>>> 1. New temp table where I am loading data is ORC formatted as current
>>> Hive
>>> table is ORC stored.
>>>
>>> 2. Hive table each partition folder size is 200 MB.
>>>
>>> 3. I am using repartition(1) in spark code so that it will create one
>>> 200MB
>>> part file in each partition folder(to avoid small file issue). With this
>>> job
>>> is completing in 23 to 26 mins.
>>>
>>> 4. If I don't use repartition(), job is completing in 12 to 13 mins. But
>>> problem with this approach is, it is creating 800 part files (size
>>> <128MB)
>>> in each partition folder.
>>>
>>>
>>>
>>> I am quite not sure on how to reduce processing time and not create small
>>> files at the same time. Could anyone please help me in this situation.
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Scala version compatibility

2020-04-07 Thread Koert Kuipers
i think it will work then assuming the callsite hasnt changed between scala
versions

On Mon, Apr 6, 2020 at 5:09 PM Andrew Melo  wrote:

> Hello,
>
> On Mon, Apr 6, 2020 at 3:31 PM Koert Kuipers  wrote:
>
>> actually i might be wrong about this. did you declare scala to be a
>> provided dependency? so scala is not in your fat/uber jar? if so then maybe
>> it will work.
>>
>
> I declare spark to be a provided dependency, so Scala's not included in my
> artifact except for this single callsite.
>
> Thanks
> Andrew
>
>
>> On Mon, Apr 6, 2020 at 4:16 PM Andrew Melo  wrote:
>>
>>>
>>>
>>> On Mon, Apr 6, 2020 at 3:08 PM Koert Kuipers  wrote:
>>>
>>>> yes it will
>>>>
>>>>
>>> Ooof, I was hoping that wasn't the case. I guess I need to figure out
>>> how to get Maven to compile/publish jars with different
>>> dependencies/artifactIDs like how sbt does? (or re-implement the
>>> functionality in java)
>>>
>>> Thanks for your help,
>>> Andrew
>>>
>>>
>>>> On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo 
>>>> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I'm aware that Scala is not binary compatible between revisions. I
>>>>> have some Java code whose only Scala dependency is the transitive
>>>>> dependency through Spark. This code calls a Spark API which returns a
>>>>> Seq, which I then convert into a List with
>>>>> JavaConverters.seqAsJavaListConverter. Will this usage cause binary
>>>>> incompatibility if the jar is compiled in one Scala version and executed 
>>>>> in
>>>>> another?
>>>>>
>>>>> I tried grokking
>>>>> https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
>>>>> and wasn't quite able to make heads or tails of this particular case.
>>>>>
>>>>> Thanks!
>>>>> Andrew
>>>>>
>>>>>
>>>>>


Re: Scala version compatibility

2020-04-06 Thread Koert Kuipers
actually i might be wrong about this. did you declare scala to be a
provided dependency? so scala is not in your fat/uber jar? if so then maybe
it will work.

On Mon, Apr 6, 2020 at 4:16 PM Andrew Melo  wrote:

>
>
> On Mon, Apr 6, 2020 at 3:08 PM Koert Kuipers  wrote:
>
>> yes it will
>>
>>
> Ooof, I was hoping that wasn't the case. I guess I need to figure out how
> to get Maven to compile/publish jars with different
> dependencies/artifactIDs like how sbt does? (or re-implement the
> functionality in java)
>
> Thanks for your help,
> Andrew
>
>
>> On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo  wrote:
>>
>>> Hello all,
>>>
>>> I'm aware that Scala is not binary compatible between revisions. I have
>>> some Java code whose only Scala dependency is the transitive dependency
>>> through Spark. This code calls a Spark API which returns a Seq, which
>>> I then convert into a List with
>>> JavaConverters.seqAsJavaListConverter. Will this usage cause binary
>>> incompatibility if the jar is compiled in one Scala version and executed in
>>> another?
>>>
>>> I tried grokking
>>> https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
>>> and wasn't quite able to make heads or tails of this particular case.
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>>


Re: Scala version compatibility

2020-04-06 Thread Koert Kuipers
yes it will

On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo  wrote:

> Hello all,
>
> I'm aware that Scala is not binary compatible between revisions. I have
> some Java code whose only Scala dependency is the transitive dependency
> through Spark. This code calls a Spark API which returns a Seq, which
> I then convert into a List with
> JavaConverters.seqAsJavaListConverter. Will this usage cause binary
> incompatibility if the jar is compiled in one Scala version and executed in
> another?
>
> I tried grokking
> https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
> and wasn't quite able to make heads or tails of this particular case.
>
> Thanks!
> Andrew
>
>
>


find failed test

2020-03-06 Thread Koert Kuipers
i just ran:
mvn test -fae > log.txt

at the end of log.txt i find it says there are failures:
[INFO] Spark Project SQL .. FAILURE [47:55
min]

that is not very helpful. what tests failed?

i could go scroll up but the file has 21,517 lines. ok let's skip that.

so i figure there are test reports in sql/core/target. i was right! its
sq/core/target/surefire-reports. but it has 276 files, so thats still a bit
much to go through. i assume there is some nice summary that shows me the
failed tests... maybe SparkTestSuite.txt? its 2687 lines, so again a bit
much, but i do go through it and find nothing useful.

so... how do i quickly find out which test failed exactly?
there must be some maven trick here?

thanks!


conflict with multiple jobs writing to different partitions but same baseDir

2019-05-25 Thread Koert Kuipers
lets say i have 2 dataframe jobs that write to /somedir/a=1 and
somedir/a=2. these can run at same time without issues.

but now i get excited about dynamic partitioning. so i add "a" as a column
to my 2 dataframes, set the option partitionOverwriteMode=dynamic, add
partitionBy("a": _*) to the writing, and write both to /somedir. this works
fine, and the result is the same, but i can no longer safely run both jobs
at same time, because they both try to create and delete
/somedir/_temporary, and i end up with exceptions about files not found in
/somedir/_temporary.

this makes me wonder why _temporary is hardcoded. why not _temporary_?


Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
most likely have to set something in spark-defaults.conf like

spark.master yarn
spark.submit.deployMode client

On Mon, May 20, 2019 at 3:14 PM Nicolas Paris 
wrote:

> Finally that was easy to connect to both hive/hdfs. I just had to copy
> the hive-site.xml from the old spark version and that worked instantly
> after unzipping.
>
> Right now I am stuck on connecting to yarn.
>
>
> On Mon, May 20, 2019 at 02:50:44PM -0400, Koert Kuipers wrote:
> > we had very little issues with hdfs or hive, but then we use hive only
> for
> > basic reading and writing of tables.
> >
> > depending on your vendor you might have to add a few settings to your
> > spark-defaults.conf. i remember on hdp you had to set the hdp.version
> somehow.
> > we prefer to build spark with hadoop being provided, and then add hadoop
> > classpath to spark classpath. this works well on cdh, hdp, and also for
> cloud
> > providers.
> >
> > for example this is a typical build with hive for cdh 5 (which is based
> on
> > hadoop 2.6, you change hadoop version based on vendor):
> > dev/make-distribution.sh --name  --tgz -Phadoop-2.6
> -Dhadoop.version=
> > 2.6.0 -Pyarn -Phadoop-provided -Phive
> > add hadoop classpath to the spark classpath in spark-env.sh:
> > export SPARK_DIST_CLASSPATH=$(hadoop classpath)
> >
> > i think certain vendors support multiple "vendor supported" installs, so
> you
> > could also look into that if you are not comfortable with running your
> own
> > spark build.
> >
> > On Mon, May 20, 2019 at 2:24 PM Nicolas Paris 
> wrote:
> >
> > > correct. note that you only need to install spark on the node you
> launch
> > it
> > > from. spark doesnt need to be installed on cluster itself.
> >
> > That sound reasonably doable for me. My guess is I will have some
> > troubles to make that spark version work with both hive & hdfs
> installed
> > on the cluster - or maybe that's finally plug-&-play i don't know.
> >
> > thanks
> >
> > On Mon, May 20, 2019 at 02:16:43PM -0400, Koert Kuipers wrote:
> > > correct. note that you only need to install spark on the node you
> launch
> > it
> > > from. spark doesnt need to be installed on cluster itself.
> > >
> > > the shared components between spark jobs on yarn are only really
> > > spark-shuffle-service in yarn and spark-history-server. i have
> found
> > > compatibility for these to be good. its best if these run latest
> version.
> > >
> > > On Mon, May 20, 2019 at 2:02 PM Nicolas Paris <
> nicolas.pa...@riseup.net>
> > wrote:
> > >
> >     > > you will need the spark version you intend to launch with on
> the
> > machine
> > > you
> > > > launch from and point to the correct spark-submit
> > >
> > > does this mean to install a second spark version (2.4) on the
> cluster
> > ?
> > >
> > > thanks
> > >
> > > On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote:
> > > > yarn can happily run multiple spark versions side-by-side
> > > > you will need the spark version you intend to launch with on
> the
> > machine
> > > you
> > > > launch from and point to the correct spark-submit
> > > >
> > > > On Mon, May 20, 2019 at 1:50 PM Nicolas Paris <
> > nicolas.pa...@riseup.net>
> > > wrote:
> > > >
> > > > Hi
> > > >
> > > > I am wondering whether that's feasible to:
> > > > - build a spark application (with sbt/maven) based on
> spark2.4
> > > > - deploy that jar on yarn on a spark2.3 based
> installation
> > > >
> > > > thanks by advance,
> > > >
> > > >
> > > > --
> > > > nicolas
> > > >
> > > >
> >
>   -
> > > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > > >
> > > >
> > >
> > > --
> > > nicolas
> > >
> > >
>  -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
> > --
> > nicolas
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
we had very little issues with hdfs or hive, but then we use hive only for
basic reading and writing of tables.

depending on your vendor you might have to add a few settings to your
spark-defaults.conf. i remember on hdp you had to set the hdp.version
somehow.
we prefer to build spark with hadoop being provided, and then add hadoop
classpath to spark classpath. this works well on cdh, hdp, and also for
cloud providers.

for example this is a typical build with hive for cdh 5 (which is based on
hadoop 2.6, you change hadoop version based on vendor):
dev/make-distribution.sh --name  --tgz -Phadoop-2.6
-Dhadoop.version=2.6.0 -Pyarn -Phadoop-provided -Phive
add hadoop classpath to the spark classpath in spark-env.sh:
export SPARK_DIST_CLASSPATH=$(hadoop classpath)

i think certain vendors support multiple "vendor supported" installs, so
you could also look into that if you are not comfortable with running your
own spark build.

On Mon, May 20, 2019 at 2:24 PM Nicolas Paris 
wrote:

> > correct. note that you only need to install spark on the node you launch
> it
> > from. spark doesnt need to be installed on cluster itself.
>
> That sound reasonably doable for me. My guess is I will have some
> troubles to make that spark version work with both hive & hdfs installed
> on the cluster - or maybe that's finally plug-&-play i don't know.
>
> thanks
>
> On Mon, May 20, 2019 at 02:16:43PM -0400, Koert Kuipers wrote:
> > correct. note that you only need to install spark on the node you launch
> it
> > from. spark doesnt need to be installed on cluster itself.
> >
> > the shared components between spark jobs on yarn are only really
> > spark-shuffle-service in yarn and spark-history-server. i have found
> > compatibility for these to be good. its best if these run latest version.
> >
> > On Mon, May 20, 2019 at 2:02 PM Nicolas Paris 
> wrote:
> >
> > > you will need the spark version you intend to launch with on the
> machine
> > you
> > > launch from and point to the correct spark-submit
> >
> >     does this mean to install a second spark version (2.4) on the
> cluster ?
> >
> > thanks
> >
> > On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote:
> > > yarn can happily run multiple spark versions side-by-side
> > > you will need the spark version you intend to launch with on the
> machine
> > you
> > > launch from and point to the correct spark-submit
> > >
> > > On Mon, May 20, 2019 at 1:50 PM Nicolas Paris <
> nicolas.pa...@riseup.net>
> > wrote:
> > >
> > > Hi
> > >
> > > I am wondering whether that's feasible to:
> > > - build a spark application (with sbt/maven) based on spark2.4
> > > - deploy that jar on yarn on a spark2.3 based installation
> > >
> > > thanks by advance,
> > >
> > >
> > > --
> > > nicolas
> > >
> > >
>  -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> > >
> >
> > --
> > nicolas
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
correct. note that you only need to install spark on the node you launch it
from. spark doesnt need to be installed on cluster itself.

the shared components between spark jobs on yarn are only really
spark-shuffle-service in yarn and spark-history-server. i have found
compatibility for these to be good. its best if these run latest version.

On Mon, May 20, 2019 at 2:02 PM Nicolas Paris 
wrote:

> > you will need the spark version you intend to launch with on the machine
> you
> > launch from and point to the correct spark-submit
>
> does this mean to install a second spark version (2.4) on the cluster ?
>
> thanks
>
> On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote:
> > yarn can happily run multiple spark versions side-by-side
> > you will need the spark version you intend to launch with on the machine
> you
> > launch from and point to the correct spark-submit
> >
> > On Mon, May 20, 2019 at 1:50 PM Nicolas Paris 
> wrote:
> >
> > Hi
> >
> > I am wondering whether that's feasible to:
> > - build a spark application (with sbt/maven) based on spark2.4
> > - deploy that jar on yarn on a spark2.3 based installation
> >
> > thanks by advance,
> >
> >
> > --
> > nicolas
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
yarn can happily run multiple spark versions side-by-side
you will need the spark version you intend to launch with on the machine
you launch from and point to the correct spark-submit

On Mon, May 20, 2019 at 1:50 PM Nicolas Paris 
wrote:

> Hi
>
> I am wondering whether that's feasible to:
> - build a spark application (with sbt/maven) based on spark2.4
> - deploy that jar on yarn on a spark2.3 based installation
>
> thanks by advance,
>
>
> --
> nicolas
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: ml Pipeline read write

2019-05-10 Thread Koert Kuipers
i guess it simply is never set, in which case it is created in:

  protected final def sparkSession: SparkSession = {
if (optionSparkSession.isEmpty) {
  optionSparkSession = Some(SparkSession.builder().getOrCreate())
}
optionSparkSession.get
  }

On Fri, May 10, 2019 at 4:31 PM Koert Kuipers  wrote:

> i am trying to understand how ml persists pipelines. it seems a
> SparkSession or SparkContext is needed for this, to write to hdfs.
>
> MLWriter and MLReader both extend BaseReadWrite to have access to a
> SparkSession. but this is where it gets confusing... the only way to set
> the SparkSession seems to be in BaseReadWrite:
>
> def session(sparkSession: SparkSession): this.type
>
> and i can find no place this is actually used, except for in one unit
> test: org.apache.spark.ml.util.JavaDefaultReadWriteSuite
>
> i confirmed it is not used by simply adding a line inside that method that
> throws an error, and all unit tests pass except for
> JavaDefaultReadWriteSuite.
>
> how is the sparkSession set?
> thanks!
>
> koert
>
>
>


ml Pipeline read write

2019-05-10 Thread Koert Kuipers
i am trying to understand how ml persists pipelines. it seems a
SparkSession or SparkContext is needed for this, to write to hdfs.

MLWriter and MLReader both extend BaseReadWrite to have access to a
SparkSession. but this is where it gets confusing... the only way to set
the SparkSession seems to be in BaseReadWrite:

def session(sparkSession: SparkSession): this.type

and i can find no place this is actually used, except for in one unit test:
org.apache.spark.ml.util.JavaDefaultReadWriteSuite

i confirmed it is not used by simply adding a line inside that method that
throws an error, and all unit tests pass except for
JavaDefaultReadWriteSuite.

how is the sparkSession set?
thanks!

koert


spark 2.4.1 -> 3.0.0-SNAPSHOT mllib

2019-04-23 Thread Koert Kuipers
 we recently started compiling against spark 3.0.0-SNAPSHOT (build inhouse
from master branch) to uncover any breaking changes that might be an issue
for us.

we ran into some of our tests breaking where we use mllib. most of it is
immaterial: we had some magic numbers hard-coded and the results are
slightly different because spark changed its random number generation or
because spark fixed a genuine bug in a classifier, etc.

however we see somewhat significant changes in ALS factors and also in
resulting recommendations. all this while there seems to be no changes in
the ALS code between spark 2.4.1 and current master.

we cannot come up with a good explanation so far. any idea what is going on?
thanks!


Re: ClassCastException for SerializedLamba

2019-03-30 Thread Koert Kuipers
i found jira that seems related:
https://issues.apache.org/jira/browse/SPARK-25047

On Fri, Mar 29, 2019 at 4:01 PM Koert Kuipers  wrote:

> hi all,
> we are switching from scala 2.11 to 2.12 with a spark 2.4.1 release
> candidate and so far this has been going pretty smoothly.
>
> however we do see some new serialization errors related to Function1,
> Function2, etc.
>
> they look like this:
> ClassCastException: cannot assign instance of
> java.lang.invoke.SerializedLambda to field MyCaseClass.f of type
> scala.Function1 in instance of MyCaseClass
> at
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
>
> these are often simple case classes with inside a val for the function.
> like this:
> case class MyCaseClass[X, Y](...) {
>   val f: Function1[X, Y] = ...
> }
>
> we had no problems with these in scala 2.11. it does not look like these
> classes have members that are not serializable, and neither do the
> functions close over anything troublesome. since we get this for some
> classes but not for others i am not entirely sure what to make of it. we
> can work around the issue by changing the val f to a def, like this:
> case class MyCaseClass[X, Y](...) {
>   def f: Function1[X, Y] = ...
> }
>
> any idea what is causing this?
> thanks!
> koert
>
>
>


ClassCastException for SerializedLamba

2019-03-29 Thread Koert Kuipers
hi all,
we are switching from scala 2.11 to 2.12 with a spark 2.4.1 release
candidate and so far this has been going pretty smoothly.

however we do see some new serialization errors related to Function1,
Function2, etc.

they look like this:
ClassCastException: cannot assign instance of
java.lang.invoke.SerializedLambda to field MyCaseClass.f of type
scala.Function1 in instance of MyCaseClass
at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)

these are often simple case classes with inside a val for the function.
like this:
case class MyCaseClass[X, Y](...) {
  val f: Function1[X, Y] = ...
}

we had no problems with these in scala 2.11. it does not look like these
classes have members that are not serializable, and neither do the
functions close over anything troublesome. since we get this for some
classes but not for others i am not entirely sure what to make of it. we
can work around the issue by changing the val f to a def, like this:
case class MyCaseClass[X, Y](...) {
  def f: Function1[X, Y] = ...
}

any idea what is causing this?
thanks!
koert


Re: Difference between dataset and dataframe

2019-02-19 Thread Koert Kuipers
dataframe operations are expressed as transformations on columns, basically
on locations inside the row objects. this specificity can be exploited by
catalyst to optimize these operations. since catalyst knows exactly what
positions in the row object you modified or not at any point and often also
what operation you did on them it can reason about these and do
optimizations like re-ordering of operations, compiling operations, and
running operations on serialized/internal formats.

when you use case classes and lamba operations not as much information is
available and the operation cannot be performed on the internal
representation. so conversions and/or deserializations are necessary.

On Tue, Feb 19, 2019 at 12:59 AM Lunagariya, Dhaval <
dhaval.lunagar...@citi.com> wrote:

> It does for dataframe also. Please try example.
>
>
>
> df1 = spark.range(2, 1000, 2)
>
> df2 = spark.range(2, 1000, 4)
>
> step1 = df1.repartition(5)
>
> step12 = df2.repartition(6)
>
> step2 = step1.selectExpr("id * 5 as id")
>
> step3 = step2.join(step12, ["id"])
>
> step4 = step3.selectExpr("sum(id)")
>
> step4.collect()
>
>
>
> step4._jdf.queryExecution().debug().codegen()
>
>
>
> You will see the generated code.
>
>
>
> Regards,
>
> Dhaval
>
>
>
> *From:* [External] Akhilanand 
> *Sent:* Tuesday, February 19, 2019 10:29 AM
> *To:* Koert Kuipers 
> *Cc:* user 
> *Subject:* Re: Difference between dataset and dataframe
>
>
>
> Thanks for the reply. But can you please tell why dataframes are
> performant than datasets? Any specifics would be helpful.
>
>
>
> Also, could you comment on the tungsten code gen part of my question?
>
>
> On Feb 18, 2019, at 10:47 PM, Koert Kuipers  wrote:
>
> in the api DataFrame is just Dataset[Row]. so this makes you think Dataset
> is the generic api. interestingly enough under the hood everything is
> really Dataset[Row], so DataFrame is really the "native" language for spark
> sql, not Dataset.
>
>
>
> i find DataFrame to be significantly more performant. in general if you
> use Dataset you miss out on some optimizations. also Encoders are not very
> pleasant to work with.
>
>
>
> On Mon, Feb 18, 2019 at 9:09 PM Akhilanand 
> wrote:
>
>
> Hello,
>
> I have been recently exploring about dataset and dataframes. I would
> really appreciate if someone could answer these questions:
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.
>
> 2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?
>
>
>
> Regards,
> Akhilanand BV
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Difference between dataset and dataframe

2019-02-18 Thread Koert Kuipers
in the api DataFrame is just Dataset[Row]. so this makes you think Dataset
is the generic api. interestingly enough under the hood everything is
really Dataset[Row], so DataFrame is really the "native" language for spark
sql, not Dataset.

i find DataFrame to be significantly more performant. in general if you use
Dataset you miss out on some optimizations. also Encoders are not very
pleasant to work with.

On Mon, Feb 18, 2019 at 9:09 PM Akhilanand  wrote:

>
> Hello,
>
> I have been recently exploring about dataset and dataframes. I would
> really appreciate if someone could answer these questions:
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.
>
> 2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?
>
>
>
> Regards,
> Akhilanand BV
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend

2018-11-29 Thread Koert Kuipers
if you only use it in the executors sometimes using lazy works

On Thu, Nov 29, 2018 at 9:45 AM James Starks 
wrote:

> This is not problem directly caused by Spark, but it's related; thus
> asking here. I use spark to read data from parquet and processing some http
> call with sttp (https://github.com/softwaremill/sttp). However, spark
> throws
>
> Caused by: java.io.NotSerializableException:
> com.softwaremill.sttp.FollowRedirectsBackend
>
> It's understood why such exception is thrown because
> FollowRedirectsBackend is not seralizable. So I would like know in such
> case -  are there any ways to get around this problem without modifying,
> recompiling original code?
>
> Thanks
>
>


structured streaming bookkeeping formats

2018-10-27 Thread Koert Kuipers
i was reading this blog post from last year about structured streaming
run-once trigger:
https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

its a nice idea to replace a batch job with structured streaming because it
does the bookkeeping (whats new, failure recovery, etc.) for you.

but that's also the part that scares me a bit. when its all done for me and
it breaks anyhow i am not sure i know how to recover. and i am unsure how
to upgrade.
so... are the formats that spark structured streaming uses for
"bookkeeping" easily readable (like say json) and stable? does it consist
of files i can go look at and  understand and edit/manipulate myself if
needed? are there are references to the format used?

thank you!

best,
koert


Re: Why repartitionAndSortWithinPartitions slower than MapReducer

2018-08-20 Thread Koert Kuipers
I assume you are using RDDs? What are you doing after the repartitioning +
sorting, if anything?


On Aug 20, 2018 11:22, "周浥尘"  wrote:

In addition to my previous email,
Environment: spark 2.1.2, hadoop 2.6.0-cdh5.11, Java 1.8, CentOS 6.6

周浥尘  于2018年8月20日周一 下午8:52写道:

> Hi team,
>
> I found the Spark method *repartitionAndSortWithinPartitions *spends
> twice as much time as using Mapreduce in some cases.
> I want to repartition the dataset accorading to split keys and save them
> to files in ascending. As the doc says, repartitionAndSortWithinPartitions
> “is more efficient than calling `repartition` and then sorting within each
> partition because it can push the sorting down into the shuffle machinery.”
> I thought it may be faster than MR, but actually, it is much more slower. I
> also adjust several configurations of spark, but that doesn't work.(Both
> Spark and Mapreduce run on a three-node cluster and share the same number
> of partitions.)
> Can this situation be explained or is there any approach to improve the
> performance of spark?
>
> Thanks & Regards,
> Yichen
>


something happened to MemoryStream after spark 2.3

2018-08-16 Thread Koert Kuipers
hi,
we just started testing internally with spark 2.4 snapshots, and it seems
our streaming tests are broken.

i believe it has to do with MemoryStream.

before we were able to create a MemoryStream, add data to it, convert it to
a streaming unbounded DataFrame and use it repeatedly. by using it
repeatedly i mean repeatedly doing: create a query (with a random uuid
name) from dataframe, process all available, stop the query. every time we
did this all the data in the MemoryStream would be processed.

now with spark 2.4.0-SNAPSHOT the second time we create a query no data is
processed at all. it is as if the MemoryStream is empty. it this expected?
should we refactor our tests?


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
thanks for that long reply jungtaek!

so when i set spark.sql.shuffle.partitions to 2048 i have 2048 data
partitions (or "partitions of state"). these are determined by a hashing
function. ok got it!

when i look at the application manager i also see 2048 "tasks" for the
relevant stage. so tasks here is not the same as parallelism, which is
defined by number of executors * number of cores. and i see these tasks are
being queued up to be processed. i have learned to watch the number of
tasks in shuffle carefully, since its the unit of work, and because when i
have memory issues (OOM) it usually means i need to split the data up more,
so increase the tasks.

so is it reasonable to assume for the shuffle that a task maps to a single
data partition being processed?

if so, then when i do a coalesce(100) after a shuffle i see only 100 tasks
for the stage of shuffle. what does this mean? does this mean a task no
longer maps to a single data partition being processed, and i still have
2048 data partitions? if so, does every task process multiple data
partitions sequentially? and does this not increase my chances of OOM
because the data partitions are processed sequentially within a task?


On Thu, Aug 9, 2018 at 3:10 AM, Jungtaek Lim  wrote:

> I could be wrong so welcome anyone to correct me if I'm missing here.
>
> You can expect Spark operators in narrow dependency as applying wrapped
> functions to an iterator (like "op3(op2(op1(iter)))"), and with such
> expectation there's no way to achieve adjusting partitions. Each partition
> is independent from others and there's no communication between tasks.
>
> So if you have 1000 partitions (in terms of parallelism, not data
> partitions) and willing to reduce down (or scale out) to some arbitrary
> number of partitions, it would require moving of data and requires shuffle.
>
> The meaning of "spark.sql.shuffle.partitions" is especially important for
> structured streaming because it defines data partitions of state. For
> structured streaming, there're couple of operations which leverage state
> which is stored to the file system. The state is partitioned by key
> columns, and "spark.sql.shuffle.partitions" data partitions are generated.
> Due to the nature of hash function, once you run the streaming query,
> "spark.sql.shuffle.partitions" keeps unchanged (Spark doesn't allow
> reconfigure for the config).
>
> So the value of configuration represents data partitions of state, as well
> as max parallelism of stateful operators. If we want to have less
> parallelism (not same as number of partitions), we should apply coalesce to
> the operator and the number of partitions are still kept unchanged whereas
> it incurs less parallelism and also less tasks.
>
> We just can't apply coalesce to individual operator in narrow dependency.
>
> -Jungtaek Lim (HeartSaVioR)
> 2018년 8월 9일 (목) 오후 3:07, Koert Kuipers 님이 작성:
>
>> well an interesting side effect of this is that i can now control the
>> number of partitions for every shuffle in a dataframe job, as opposed to
>> having a single setting for number of partitions across all shuffles.
>>
>> basically i can set spark.sql.shuffle.partitions to some huge number, and
>> then for every groupByKey (or any other shuffle operation) follow it up
>> with a coalesce to set the number of partitions. its like i have
>> numPartitions back from those good old RDD shuffle methods :)
>>
>>
>> On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers  wrote:
>>
>>> an new map task after a shuffle is also a narrow dependency, isnt it?
>>> its narrow because data doesn't need to move, e.g. every partition depends
>>> on single partition, preferably on same machine.
>>>
>>> modifying a previous shuffle to avoid a shuffle strikes me as odd, and
>>> can potentially make a mess of performance, especially when no shuffle is
>>> needed. just a new map task.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:
>>>
>>>> > shouldnt coalesce introduce a new map-phase with less tasks instead
>>>> of changing the previous shuffle?
>>>>
>>>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>>>> results in narrow dependency, hence no shuffle.
>>>>
>>>> So it is pretty clear that you need to use "repartition". Not sure
>>>> there's any available trick to achieve it without calling repartition.
>>>>
>>>> Thanks,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e
>>>>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
well an interesting side effect of this is that i can now control the
number of partitions for every shuffle in a dataframe job, as opposed to
having a single setting for number of partitions across all shuffles.

basically i can set spark.sql.shuffle.partitions to some huge number, and
then for every groupByKey (or any other shuffle operation) follow it up
with a coalesce to set the number of partitions. its like i have
numPartitions back from those good old RDD shuffle methods :)


On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers  wrote:

> an new map task after a shuffle is also a narrow dependency, isnt it? its
> narrow because data doesn't need to move, e.g. every partition depends on
> single partition, preferably on same machine.
>
> modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
> potentially make a mess of performance, especially when no shuffle is
> needed. just a new map task.
>
>
> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:
>
>> > shouldnt coalesce introduce a new map-phase with less tasks instead of
>> changing the previous shuffle?
>>
>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>> results in narrow dependency, hence no shuffle.
>>
>> So it is pretty clear that you need to use "repartition". Not sure
>> there's any available trick to achieve it without calling repartition.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a03081
>> 65f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/
>> spark/sql/Dataset.scala#L2918-L2937
>>
>>
>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:
>>
>>> sorry i meant to say:
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with
>>> 100 tasks.
>>>
>>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:
>>>
>>>> the only thing that seems to stop this so far is a checkpoint.
>>>>
>>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>>>> tasks.
>>>>
>>>> now i need to figure out how to do this without having to checkpoint. i
>>>> wish i could insert something like a dummy operation that logical steps
>>>> cannot jump over.
>>>>
>>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers 
>>>> wrote:
>>>>
>>>>> ok thanks.
>>>>>
>>>>> mh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>>> with less tasks instead of changing the previous shuffle?
>>>>>
>>>>> using repartition seems too expensive just to keep the number of files
>>>>> down. so i guess i am back to looking for another solution.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
>>>>> wrote:
>>>>>
>>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>>> have to use `repartition` instead which is going to introduce an extra
>>>>>> shuffle stage
>>>>>>
>>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers 
>>>>>> wrote:
>>>>>> >
>>>>>> > one small correction: lots of files leads to pressure on the spark
>>>>>> driver program when reading this data in spark.
>>>>>> >
>>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>>>>>> wrote:
>>>>>> >>
>>>>>> >> hi,
>>>>>> >>
>>>>>> >> i am reading data from files into a dataframe, then doing a
>>>>>> groupBy for a given column with a count, and finally i coalesce to a
>>>>>> smaller number of partitions before writing out to disk. so roughly:
>>>>>> >>
>>>>>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>>>>>> lesce(100).write.format(...).save(...)
>>>>>> >>
>>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>>> >>
>>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce has
>>>>>> taken over the partitioning of the groupBy.
>>>>>> >>
>>>>>> >> any idea why?
>>>>>> >>
>>>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>>>> files, lots of files leads to pressure down the line on executors reading
>>>>>> this data (i am writing to just one partition of a larger dataset), and
>>>>>> since i have less than 100 executors i expect it to be efficient. so 
>>>>>> sounds
>>>>>> like a good idea, no?
>>>>>> >>
>>>>>> >> but i do need 2048 partitions in my shuffle due to the operation i
>>>>>> am doing in the groupBy (in my real problem i am not just doing a 
>>>>>> count...).
>>>>>> >>
>>>>>> >> thanks!
>>>>>> >> koert
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from my iPhone
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
an new map task after a shuffle is also a narrow dependency, isnt it? its
narrow because data doesn't need to move, e.g. every partition depends on
single partition, preferably on same machine.

modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
potentially make a mess of performance, especially when no shuffle is
needed. just a new map task.


On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim  wrote:

> > shouldnt coalesce introduce a new map-phase with less tasks instead of
> changing the previous shuffle?
>
> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
> results in narrow dependency, hence no shuffle.
>
> So it is pretty clear that you need to use "repartition". Not sure there's
> any available trick to achieve it without calling repartition.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e
> 9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/
> Dataset.scala#L2918-L2937
>
>
> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성:
>
>> sorry i meant to say:
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with
>> 100 tasks.
>>
>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:
>>
>>> the only thing that seems to stop this so far is a checkpoint.
>>>
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>>> tasks.
>>>
>>> now i need to figure out how to do this without having to checkpoint. i
>>> wish i could insert something like a dummy operation that logical steps
>>> cannot jump over.
>>>
>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:
>>>
>>>> ok thanks.
>>>>
>>>> mh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>> with less tasks instead of changing the previous shuffle?
>>>>
>>>> using repartition seems too expensive just to keep the number of files
>>>> down. so i guess i am back to looking for another solution.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
>>>> wrote:
>>>>
>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>> have to use `repartition` instead which is going to introduce an extra
>>>>> shuffle stage
>>>>>
>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers 
>>>>> wrote:
>>>>> >
>>>>> > one small correction: lots of files leads to pressure on the spark
>>>>> driver program when reading this data in spark.
>>>>> >
>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>>>>> wrote:
>>>>> >>
>>>>> >> hi,
>>>>> >>
>>>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>>>> for a given column with a count, and finally i coalesce to a smaller 
>>>>> number
>>>>> of partitions before writing out to disk. so roughly:
>>>>> >>
>>>>> >> spark.read.format(...).load(...).groupBy(column).count().
>>>>> coalesce(100).write.format(...).save(...)
>>>>> >>
>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>> >>
>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce has
>>>>> taken over the partitioning of the groupBy.
>>>>> >>
>>>>> >> any idea why?
>>>>> >>
>>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>>> files, lots of files leads to pressure down the line on executors reading
>>>>> this data (i am writing to just one partition of a larger dataset), and
>>>>> since i have less than 100 executors i expect it to be efficient. so 
>>>>> sounds
>>>>> like a good idea, no?
>>>>> >>
>>>>> >> but i do need 2048 partitions in my shuffle due to the operation i
>>>>> am doing in the groupBy (in my real problem i am not just doing a 
>>>>> count...).
>>>>> >>
>>>>> >> thanks!
>>>>> >> koert
>>>>> >>
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Sent from my iPhone
>>>>>
>>>>
>>>>
>>>
>>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
sorry i meant to say:
wit a checkpoint i get a map phase with lots of tasks to read the data,
then a reduce phase with 2048 reducers, and then finally a map phase with
100 tasks.

On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers  wrote:

> the only thing that seems to stop this so far is a checkpoint.
>
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with 4
> tasks.
>
> now i need to figure out how to do this without having to checkpoint. i
> wish i could insert something like a dummy operation that logical steps
> cannot jump over.
>
> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:
>
>> ok thanks.
>>
>> mh. that seems odd. shouldnt coalesce introduce a new map-phase with
>> less tasks instead of changing the previous shuffle?
>>
>> using repartition seems too expensive just to keep the number of files
>> down. so i guess i am back to looking for another solution.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov 
>> wrote:
>>
>>> `coalesce` sets the number of partitions for the last stage, so you
>>> have to use `repartition` instead which is going to introduce an extra
>>> shuffle stage
>>>
>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>>> >
>>> > one small correction: lots of files leads to pressure on the spark
>>> driver program when reading this data in spark.
>>> >
>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>>> wrote:
>>> >>
>>> >> hi,
>>> >>
>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>> for a given column with a count, and finally i coalesce to a smaller number
>>> of partitions before writing out to disk. so roughly:
>>> >>
>>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>>> lesce(100).write.format(...).save(...)
>>> >>
>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>> >>
>>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>>> over the partitioning of the groupBy.
>>> >>
>>> >> any idea why?
>>> >>
>>> >> i am doing coalesce because it is not helpful to write out 2048
>>> files, lots of files leads to pressure down the line on executors reading
>>> this data (i am writing to just one partition of a larger dataset), and
>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>> like a good idea, no?
>>> >>
>>> >> but i do need 2048 partitions in my shuffle due to the operation i am
>>> doing in the groupBy (in my real problem i am not just doing a count...).
>>> >>
>>> >> thanks!
>>> >> koert
>>> >>
>>> >
>>>
>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>>
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
the only thing that seems to stop this so far is a checkpoint.

wit a checkpoint i get a map phase with lots of tasks to read the data,
then a reduce phase with 2048 reducers, and then finally a map phase with 4
tasks.

now i need to figure out how to do this without having to checkpoint. i
wish i could insert something like a dummy operation that logical steps
cannot jump over.

On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers  wrote:

> ok thanks.
>
> mh. that seems odd. shouldnt coalesce introduce a new map-phase with
> less tasks instead of changing the previous shuffle?
>
> using repartition seems too expensive just to keep the number of files
> down. so i guess i am back to looking for another solution.
>
>
>
> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov  wrote:
>
>> `coalesce` sets the number of partitions for the last stage, so you
>> have to use `repartition` instead which is going to introduce an extra
>> shuffle stage
>>
>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>> >
>> > one small correction: lots of files leads to pressure on the spark
>> driver program when reading this data in spark.
>> >
>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers 
>> wrote:
>> >>
>> >> hi,
>> >>
>> >> i am reading data from files into a dataframe, then doing a groupBy
>> for a given column with a count, and finally i coalesce to a smaller number
>> of partitions before writing out to disk. so roughly:
>> >>
>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>> lesce(100).write.format(...).save(...)
>> >>
>> >> i have this setting: spark.sql.shuffle.partitions=2048
>> >>
>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>> over the partitioning of the groupBy.
>> >>
>> >> any idea why?
>> >>
>> >> i am doing coalesce because it is not helpful to write out 2048 files,
>> lots of files leads to pressure down the line on executors reading this
>> data (i am writing to just one partition of a larger dataset), and since i
>> have less than 100 executors i expect it to be efficient. so sounds like a
>> good idea, no?
>> >>
>> >> but i do need 2048 partitions in my shuffle due to the operation i am
>> doing in the groupBy (in my real problem i am not just doing a count...).
>> >>
>> >> thanks!
>> >> koert
>> >>
>> >
>>
>>
>> --
>> Sent from my iPhone
>>
>
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
ok thanks.

mh. that seems odd. shouldnt coalesce introduce a new map-phase with
less tasks instead of changing the previous shuffle?

using repartition seems too expensive just to keep the number of files
down. so i guess i am back to looking for another solution.



On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov  wrote:

> `coalesce` sets the number of partitions for the last stage, so you
> have to use `repartition` instead which is going to introduce an extra
> shuffle stage
>
> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
> >
> > one small correction: lots of files leads to pressure on the spark
> driver program when reading this data in spark.
> >
> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers  wrote:
> >>
> >> hi,
> >>
> >> i am reading data from files into a dataframe, then doing a groupBy for
> a given column with a count, and finally i coalesce to a smaller number of
> partitions before writing out to disk. so roughly:
> >>
> >> spark.read.format(...).load(...).groupBy(column).count().
> coalesce(100).write.format(...).save(...)
> >>
> >> i have this setting: spark.sql.shuffle.partitions=2048
> >>
> >> i expect to see 2048 partitions in shuffle. what i am seeing instead is
> a shuffle with only 100 partitions. it's like the coalesce has taken over
> the partitioning of the groupBy.
> >>
> >> any idea why?
> >>
> >> i am doing coalesce because it is not helpful to write out 2048 files,
> lots of files leads to pressure down the line on executors reading this
> data (i am writing to just one partition of a larger dataset), and since i
> have less than 100 executors i expect it to be efficient. so sounds like a
> good idea, no?
> >>
> >> but i do need 2048 partitions in my shuffle due to the operation i am
> doing in the groupBy (in my real problem i am not just doing a count...).
> >>
> >> thanks!
> >> koert
> >>
> >
>
>
> --
> Sent from my iPhone
>


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
one small correction: lots of files leads to pressure on the spark driver
program when reading this data in spark.

On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers  wrote:

> hi,
>
> i am reading data from files into a dataframe, then doing a groupBy for a
> given column with a count, and finally i coalesce to a smaller number of
> partitions before writing out to disk. so roughly:
>
> spark.read.format(...).load(...).groupBy(column).count().
> coalesce(100).write.format(...).save(...)
>
> i have this setting: spark.sql.shuffle.partitions=2048
>
> i expect to see 2048 partitions in shuffle. what i am seeing instead is a
> shuffle with only 100 partitions. it's like the coalesce has taken over the
> partitioning of the groupBy.
>
> any idea why?
>
> i am doing coalesce because it is not helpful to write out 2048 files,
> lots of files leads to pressure down the line on executors reading this
> data (i am writing to just one partition of a larger dataset), and since i
> have less than 100 executors i expect it to be efficient. so sounds like a
> good idea, no?
>
> but i do need 2048 partitions in my shuffle due to the operation i am
> doing in the groupBy (in my real problem i am not just doing a count...).
>
> thanks!
> koert
>
>


groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
hi,

i am reading data from files into a dataframe, then doing a groupBy for a
given column with a count, and finally i coalesce to a smaller number of
partitions before writing out to disk. so roughly:

spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)

i have this setting: spark.sql.shuffle.partitions=2048

i expect to see 2048 partitions in shuffle. what i am seeing instead is a
shuffle with only 100 partitions. it's like the coalesce has taken over the
partitioning of the groupBy.

any idea why?

i am doing coalesce because it is not helpful to write out 2048 files, lots
of files leads to pressure down the line on executors reading this data (i
am writing to just one partition of a larger dataset), and since i have
less than 100 executors i expect it to be efficient. so sounds like a good
idea, no?

but i do need 2048 partitions in my shuffle due to the operation i am doing
in the groupBy (in my real problem i am not just doing a count...).

thanks!
koert


spark structured streaming with file based sources and sinks

2018-08-06 Thread Koert Kuipers
has anyone used spark structured streaming from/to files (json, csv,
parquet, avro) in a non-test setting?

i realize kafka is probably the way to go, but lets say i have a situation
where kafka is not available for reasons out of my control, and i want to
do micro-batching. could i use files to do so in a production setting?
basically:

files on hdfs => spark structured streaming => files on hdfs => spark
structured streaming => files on hdfs => etc.

i assumed this is not a good idea but interested to hear otherwise.


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Koert Kuipers
this works for dataframes with spark 2.3 by changing a global setting, and
will be configurable per write in 2.4
see:
https://issues.apache.org/jira/browse/SPARK-20236
https://issues.apache.org/jira/browse/SPARK-24860

On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel  wrote:

> Hi Peay,
>
> Have you find better solution yet? I am having same issue.
>
> Following says it works with spark 2.1 onward but only when you use
> sqlContext and not Dataframe
> https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-
> 2e2b818a007a
>
> Thanks,
> Nirav
>
> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh  wrote:
>
>> If your processing task inherently processes input data by month you
>> may want to "manually" partition the output data by month as well as
>> by day, that is to save it with a file name including the given month,
>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>> overwrite mode with each month partition. Hope this could be of some
>> help.
>>
>> --
>> Pavel Knoblokh
>>
>> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
>> > Hello,
>> >
>> > I am trying to use
>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>> > dataset while splitting by day.
>> >
>> > I would like to run a Spark job  to process, e.g., a month:
>> > dataset.parquet/day=2017-01-01/...
>> > ...
>> >
>> > and then run another Spark job to add another month using the same
>> folder
>> > structure, getting me
>> > dataset.parquet/day=2017-01-01/
>> > ...
>> > dataset.parquet/day=2017-02-01/
>> > ...
>> >
>> > However:
>> > - with save mode "overwrite", when I process the second month, all of
>> > dataset.parquet/ gets removed and I lose whatever was already computed
>> for
>> > the previous month.
>> > - with save mode "append", then I can't get idempotence: if I run the
>> job to
>> > process a given month twice, I'll get duplicate data in all the
>> subfolders
>> > for that month.
>> >
>> > Is there a way to do "append in terms of the subfolders from
>> partitionBy,
>> > but overwrite within each such partitions? Any help would be
>> appreciated.
>> >
>> > Thanks!
>>
>>
>>
>> --
>> Pavel Knoblokh
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Re: Dataframe vs Dataset dilemma: either Row parsing or no filter push-down

2018-06-18 Thread Koert Kuipers
we use DataFrame and RDD. Dataset not only has issues with predicate
pushdown, it also adds shufffles at times where it shouldn't. and there is
some overhead from the encoders themselves, because under the hood it is
still just Row objects.


On Mon, Jun 18, 2018 at 5:00 PM, Valery Khamenya  wrote:

> Hi Spark gurus,
>
> I was surprised to read here:
> https://stackoverflow.com/questions/50129411/why-is-
> predicate-pushdown-not-used-in-typed-dataset-api-vs-untyped-dataframe-ap
>
> that filters are not pushed down in typed Datasets and one should rather
> stick to Dataframes.
>
> But writing code for groupByKey + mapGroups is a headache with Dataframes
> if compared to typed Dataset. The former mostly doesn't force you to write
> any Encoders (unless you try to write generic transformations on
> parametrized Dataset[T]) . Neither typed Dataset forces you to do an ugly
> Row parsing with getInteger, getString, etc -- like it is needed with
> Dataframes.
>
> So, what should the poor Spark user rely on by default, if the goal is to
> deliver a library of  data transformations -- Dataset or Dataframe?
>
> best regards
> --
> Valery
>


Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Koert Kuipers
thanks, thats helpful.


On Wed, May 30, 2018 at 5:05 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Few things
>
>
>
>1. Append mode is going to output data that falls out of the watermark
>2. Structured streaming isn’t time based. It reacts only when it sees
>input data. If no data appears in the input it will not move the
>aggregation window
>3. Clock time is irrelevant to structured streaming. As far as
>structured streaming is concerned, “current time” is max time of timestamp
>column used by the window
>
>
>
> SO, what is happening in our case is that you posted 1 and 2 within 2
> seconds of each other. Since, none of them fell outside of watermark, it
> didn’t output anything. Now, until the point you posted 3, time was frozen
> for Structured streaming. The max time of the timestamp column was the
> timestamp of message 2. So, current time was the timestamp of message 2.
> When you posted 3, the time advanced to the timestamp of 3, which caused 1
> to fall out, so it output 1.
>
>
>
> Note that, it will not output 1 exactly 1 second after 1 arrives. The
> clock time means nothing.
>
>
>
> *From: *Koert Kuipers 
> *Date: *Monday, May 28, 2018 at 6:17 PM
> *To: *user 
> *Subject: *trying to understand structured streaming aggregation with
> watermark and append outputmode
>
>
>
> hello all,
>
> just playing with structured streaming aggregations for the first time.
> this is my little program i run inside sbt:
>
>
>
> import org.apache.spark.sql.functions._
>
> val lines = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> val query = lines
>   .withColumn("time", current_timestamp)
>   .withWatermark("time", "1 second")
>   .groupBy(window($"time", "1 second")).agg(collect_list("value") as
> "value")
>   .withColumn("windowstring", $"window" as "string")
>   .writeStream
>   .format("console")
>   .outputMode(OutputMode.Append)
>   .start()
>
> query.awaitTermination()
>
>
>
> before i start it i create a little server with nc:
>
> $ nc -lk 
>
>
>
> after it starts i simply type in a single character every 20 seconds or so
> inside nc and hit enter. my characters are 1, 2, 3, etc.
>
>
>
> the thing i dont understand is it comes back with the correct responses,
> but with delays in terms of entries (not time). after the first 2
> characters it comes back with empty aggregations, and then for every next
> character it comes back with the response for 2 characters ago. so when i
> hit 3 it comes back with the response for 1.
>
>
>
> not very realtime :(
>
>
>
> any idea why?
>
>
>
> i would like it to respond to my input 1 with the relevant response
> for that input (after the window and watermark has expired, of course, so
> within 2 seconds).
>
>
>
> i tried adding a trigger of 1 second but that didnt help either.
>
>
>
> below is the output with my inputs inserted using '<= ', so '<= 1'
> means i hit 1 and then enter.
>
>
>
>
>
> <= 1
> ---
> Batch: 0
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 2
> ---
> Batch: 1
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 3
> Batch: 2
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
> ++-++
>
> <= 4
> ---
> Batch: 3
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
> ++-++
>
> <= 5
> ---
> Batch: 4
> ---
> +--

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-29 Thread Koert Kuipers
let me ask this another way: if i run this program and then feed it a
single value (on nc), it returns a single result, which is an empty batch.
it will not return anything else after that, no matter how long i wait.

this only happens with watermarking and append output mode.

what do i do to correct this behavior?


On Mon, May 28, 2018 at 6:16 PM, Koert Kuipers  wrote:

> hello all,
> just playing with structured streaming aggregations for the first time.
> this is my little program i run inside sbt:
>
> import org.apache.spark.sql.functions._
>
> val lines = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", )
>   .load()
>
> val query = lines
>   .withColumn("time", current_timestamp)
>   .withWatermark("time", "1 second")
>   .groupBy(window($"time", "1 second")).agg(collect_list("value") as
> "value")
>   .withColumn("windowstring", $"window" as "string")
>   .writeStream
>   .format("console")
>   .outputMode(OutputMode.Append)
>   .start()
>
> query.awaitTermination()
>
> before i start it i create a little server with nc:
> $ nc -lk 
>
> after it starts i simply type in a single character every 20 seconds or so
> inside nc and hit enter. my characters are 1, 2, 3, etc.
>
> the thing i dont understand is it comes back with the correct responses,
> but with delays in terms of entries (not time). after the first 2
> characters it comes back with empty aggregations, and then for every next
> character it comes back with the response for 2 characters ago. so when i
> hit 3 it comes back with the response for 1.
>
> not very realtime :(
>
> any idea why?
>
> i would like it to respond to my input 1 with the relevant response
> for that input (after the window and watermark has expired, of course, so
> within 2 seconds).
>
> i tried adding a trigger of 1 second but that didnt help either.
>
> below is the output with my inputs inserted using '<= ', so '<= 1'
> means i hit 1 and then enter.
>
>
> <= 1
> ---
> Batch: 0
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 2
> ---
> Batch: 1
> ---
> +--+-++
> |window|value|windowstring|
> +--+-++
> +--+-++
>
> <= 3
> Batch: 2
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
> ++-++
>
> <= 4
> ---
> Batch: 3
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
> ++-++
>
> <= 5
> ---
> Batch: 4
> ---
> ++-++
> |  window|value|windowstring|
> ++-++
> |[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
> ++-++
>
>
>


trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-28 Thread Koert Kuipers
hello all,
just playing with structured streaming aggregations for the first time.
this is my little program i run inside sbt:

import org.apache.spark.sql.functions._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val query = lines
  .withColumn("time", current_timestamp)
  .withWatermark("time", "1 second")
  .groupBy(window($"time", "1 second")).agg(collect_list("value") as
"value")
  .withColumn("windowstring", $"window" as "string")
  .writeStream
  .format("console")
  .outputMode(OutputMode.Append)
  .start()

query.awaitTermination()

before i start it i create a little server with nc:
$ nc -lk 

after it starts i simply type in a single character every 20 seconds or so
inside nc and hit enter. my characters are 1, 2, 3, etc.

the thing i dont understand is it comes back with the correct responses,
but with delays in terms of entries (not time). after the first 2
characters it comes back with empty aggregations, and then for every next
character it comes back with the response for 2 characters ago. so when i
hit 3 it comes back with the response for 1.

not very realtime :(

any idea why?

i would like it to respond to my input 1 with the relevant response
for that input (after the window and watermark has expired, of course, so
within 2 seconds).

i tried adding a trigger of 1 second but that didnt help either.

below is the output with my inputs inserted using '<= ', so '<= 1'
means i hit 1 and then enter.


<= 1
---
Batch: 0
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 2
---
Batch: 1
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 3
Batch: 2
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
++-++

<= 4
---
Batch: 3
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
++-++

<= 5
---
Batch: 4
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
++-++


Re: Scala's Seq:* equivalent in java

2018-05-15 Thread Koert Kuipers
Isn't _* varargs? So you should be able to use Java array?

On Tue, May 15, 2018, 06:29 onmstester onmstester 
wrote:

> I could not find how to pass a list to isin() filter in java, something
> like this could be done with scala:
>
> val ids = Array(1,2)
> df.filter(df("id").isin(ids:_*)).show
>
> But in java everything that converts java list to scala Seq fails with
> unsupported literal type exception:
> JavaConversions.asScalaBuffer(list).toSeq()
>
> JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq().seq()
>
> Sent using Zoho Mail 
>
>
>
On May 15, 2018 06:29, "onmstester onmstester"  wrote:

I could not find how to pass a list to isin() filter in java, something
like this could be done with scala:

val ids = Array(1,2)
df.filter(df("id").isin(ids:_*)).show

But in java everything that converts java list to scala Seq fails with
unsupported literal type exception:
JavaConversions.asScalaBuffer(list).toSeq()
JavaConverters.asScalaIteratorConverter(list.iterator()).asScala().toSeq().seq()

Sent using Zoho Mail 


Re: Structured Streaming, Reading and Updating a variable

2018-05-15 Thread Koert Kuipers
You use a windowed aggregation for this

On Tue, May 15, 2018, 09:23 Martin Engen  wrote:

> Hello,
>
>
>
> I'm working with Structured Streaming, and I need a method of keeping a
> running average based on last 24hours of data.
>
> To help with this, I can use Exponential Smoothing, which means I really
> only need to store 1 value from a previous calculation into the new, and
> update this variable as calculations carry on.
>
>
>
> Implementing this is a much bigger challenge then I ever imagined.
>
>
>
>
>
> I've tried using Accumulators and to Query/Store data to Cassandra after
> every calculation. Both methods worked somewhat locally , but I don't seem
> to be able to use these in the Spark Worker Nodes,  as I get the error
> "java.lang.NoClassDefFoundError: Could not initialize class error" both
> for the accumulator and the cassandra connection libary
>
>
>
> How can you read/update a variable while doing calculations using
> Structured Streaming?
>
>
> Thank you
>
>
>
>


Spark structured streaming aggregation within microbatch

2018-05-15 Thread Koert Kuipers
I have a streaming dataframe where I insert a uuid in every row, then join
with a static dataframe (after which uuid column is no longer unique), then
group by uuid and do a simple aggregation.

So I know all rows with same uuid will be in same micro batch, guaranteed,
correct? How do I express it as such in structured streaming? I don't need
an aggregation across batches.

Thanks!


Re: Guava dependency issue

2018-05-08 Thread Koert Kuipers
we shade guava in our fat jar/assembly jar/application jar

On Tue, May 8, 2018 at 12:31 PM, Marcelo Vanzin  wrote:

> Using a custom Guava version with Spark is not that simple. Spark
> shades Guava, but a lot of libraries Spark uses do not - the main one
> being all of the Hadoop ones, and they need a quite old Guava.
>
> So you have two options: shade/relocate Guava in your application, or
> use spark.{driver|executor}.userClassPath first.
>
> There really isn't anything easier until we get shaded Hadoop client
> libraries...
>
> On Tue, May 8, 2018 at 8:44 AM, Stephen Boesch  wrote:
> >
> > I downgraded to spark 2.0.1 and it fixed that particular runtime
> exception:
> > but then a similar one appears when saving to parquet:
> >
> > An  SOF question on this was created a month ago and today further
> details
> > plus an open bounty were added to it:
> >
> > https://stackoverflow.com/questions/49713485/spark-
> error-with-google-guava-library-java-lang-nosuchmethoderror-com-google-c
> >
> > The new but similar exception is shown below:
> >
> > The hack to downgrade to 2.0.1 does help - i.e. execution proceeds
> further :
> > but then when writing out to parquet the above error does happen.
> >
> > 8/05/07 11:26:11 ERROR Executor: Exception in task 0.0 in stage 2741.0
> (TID
> > 2618)
> > java.lang.NoSuchMethodError:
> > com.google.common.cache.CacheBuilder.build(Lcom/google/common/cache/
> CacheLoader;)Lcom/google/common/cache/LoadingCache;
> > at
> > org.apache.hadoop.io.compress.CodecPool.createCache(CodecPool.java:62)
> > at org.apache.hadoop.io.compress.CodecPool.(CodecPool.
> java:74)
> > at
> > org.apache.parquet.hadoop.CodecFactory$BytesCompressor.<
> init>(CodecFactory.java:92)
> > at
> > org.apache.parquet.hadoop.CodecFactory.getCompressor(
> CodecFactory.java:169)
> > at
> > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:303)
> > at
> > org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(
> ParquetOutputFormat.java:262)
> > at
> > org.apache.spark.sql.execution.datasources.parquet.
> ParquetOutputWriter.(ParquetFileFormat.scala:562)
> > at
> > org.apache.spark.sql.execution.datasources.parquet.
> ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
> > at
> > org.apache.spark.sql.execution.datasources.BaseWriterContainer.
> newOutputWriter(WriterContainer.scala:131)
> > at
> > org.apache.spark.sql.execution.datasources.DefaultWriterContainer.
> writeRows(WriterContainer.scala:247)
> > at
> > org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> > at
> > org.apache.spark.sql.execution.datasources.
> InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$
> apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:70)
> > at org.apache.spark.scheduler.Task.run(Task.scala:86)
> > at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:6
> >
> >
> >
> > 2018-05-07 10:30 GMT-07:00 Stephen Boesch :
> >>
> >> I am intermittently running into guava dependency issues across mutiple
> >> spark projects.  I have tried maven shade / relocate but it does not
> resolve
> >> the issues.
> >>
> >> The current project is extremely simple: *no* additional dependencies
> >> beyond scala, spark, and scalatest - yet the issues remain (and yes mvn
> >> clean was re-applied).
> >>
> >> Is there a reliable approach to handling the versioning for guava within
> >> spark dependency projects?
> >>
> >>
> >> [INFO]
> >> 
> 
> >> [INFO] Building ccapps_final 1.0-SNAPSHOT
> >> [INFO]
> >> 
> 
> >> [INFO]
> >> [INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ ccapps_final ---
> >> 18/05/07 10:24:00 WARN NativeCodeLoader: Unable to load native-hadoop
> >> library for your platform... using builtin-java classes where applicable
> >> [WARNING]
> >> java.lang.NoSuchMethodError:
> >> com.google.common.cache.CacheBuilder.refreshAfterWrite(JLjava/util/
> concurrent/TimeUnit;)Lcom/google/common/cache/CacheBuilder;
> >> at org.apache.hadoop.security.Groups.(Groups.java:96)
> >> at org.apache.hadoop.security.Groups.(Groups.java:73)
> >> at
> >> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(
> Groups.java:293)
> >> at
> >> org.apache.hadoop.security.UserGroupInformation.initialize(
> UserGroupInformation.java:283)
> >> at
> >> 

Re: all spark settings end up being system properties

2018-03-30 Thread Koert Kuipers
thanks i will check our SparkSubmit class

On Fri, Mar 30, 2018 at 2:46 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> Why: it's part historical, part "how else would you do it".
>
> SparkConf needs to read properties read from the command line, but
> SparkConf is something that user code instantiates, so we can't easily
> make it read data from arbitrary locations. You could use thread
> locals and other tricks, but user code can always break those.
>
> Where: this is done by the SparkSubmit class (look for the Scala
> version, "sys.props").
>
>
> On Fri, Mar 30, 2018 at 11:41 AM, Koert Kuipers <ko...@tresata.com> wrote:
> > does anyone know why all spark settings end up being system properties,
> and
> > where this is done?
> >
> > for example when i pass "--conf spark.foo=bar" into spark-submit then
> > System.getProperty("spark.foo") will be equal to "bar"
> >
> > i grepped the spark codebase for System.setProperty or
> System.setProperties
> > and i see it being used in some places but never for all spark settings.
> >
> > we are running into some weird side effects because of this since we use
> > typesafe config which has system properties as overrides so we see them
> pop
> > up there again unexpectedly.
>
>
>
> --
> Marcelo
>


all spark settings end up being system properties

2018-03-30 Thread Koert Kuipers
does anyone know why all spark settings end up being system properties, and
where this is done?

for example when i pass "--conf spark.foo=bar" into spark-submit then
System.getProperty("spark.foo") will be equal to "bar"

i grepped the spark codebase for System.setProperty or System.setProperties
and i see it being used in some places but never for all spark settings.

we are running into some weird side effects because of this since we use
typesafe config which has system properties as overrides so we see them pop
up there again unexpectedly.


change spark default for a setting without overriding user

2018-03-16 Thread Koert Kuipers
i would like to change some defaults in spark without overriding the user
if she/he wishes to change them.

for example currently spark.blacklist.enabled is by default false, which
makes sense for backwards compatibility.

i would like it to be by default true, but if the user provided --conf
spark.spark.blacklist.enabled=false it should be set to false again.

i am aware that i can do:

val spark = SparkSession
  .builder()
  .appName("SomeExample")
  .config("spark.spark.blacklist.enabled", "true")
  .getOrCreate()

however if i understand the order in which settings are applied correctly,
this will override any setting the user provided with --conf, which is
undesired.

note that i do not have permission to change spark-defaults.conf because
the spark install is also shared with some other applications for which i
do not want to change the defaults.

any suggestions? thanks


Re: Does Spark 2.2.0 support Dataset<List<Map<String,Object>>> ?

2017-10-09 Thread Koert Kuipers
if you are willing to use kryo encoder you can do your original Dataset<
List<Map<String,Object>>>> i think

for example in scala i create here an intermediate Dataset[Any]:

scala> Seq(1,2,3).toDS.map(x => if (x % 2 == 0) x else
x.toString)(org.apache.spark.sql.Encoders.kryo[Any]).map{ (x: Any) => x
match { case i: Int => i.toString; case s: String => s }}.show
+-+
|value|
+-+
|1|
|2|
|3|
+-+




On Mon, Oct 9, 2017 at 2:38 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi Koert,
>
> Thanks! If I have this  Dataset<Seq<Map<String, X>>> what would be the
> Enconding?is it Encoding.kryo(Seq.class) ?
>
> Also shouldn't List be supported? Should I create a ticket for this?
>
>
> On Mon, Oct 9, 2017 at 6:10 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> it supports Dataset<Seq<Map<String, X>>> where X must be a supported type
>> also. Object is not a supported type.
>>
>> On Mon, Oct 9, 2017 at 7:36 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am wondering if spark supports Dataset<List<Map<String,Object>>> ?
>>>
>>> when I do the following it says no map function available?
>>>
>>> Dataset<List<Map<String,Object>>> resultDs = ds.map(lambda,
>>> Encoders.bean(List.class));
>>>
>>> Thanks!
>>>
>>>
>>>
>>
>


Re: Does Spark 2.2.0 support Dataset<List<Map<String,Object>>> ?

2017-10-09 Thread Koert Kuipers
it supports Dataset>> where X must be a supported type
also. Object is not a supported type.

On Mon, Oct 9, 2017 at 7:36 AM, kant kodali  wrote:

> Hi All,
>
> I am wondering if spark supports Dataset>> ?
>
> when I do the following it says no map function available?
>
> Dataset>> resultDs = ds.map(lambda,
> Encoders.bean(List.class));
>
> Thanks!
>
>
>


Re: Apache Spark - MLLib challenges

2017-09-23 Thread Koert Kuipers
our main challenge has been the lack of support for missing values generally

On Sat, Sep 23, 2017 at 3:41 AM, Irfan Kabli 
wrote:

> Dear All,
>
> We are looking to position MLLib in our organisation for machine learning
> tasks and are keen to understand if their are any challenges that you might
> have seen with MLLib in production. We will be going with the pure
> open-source approach here, rather than using one of the hadoop
> distributions out their in the market.
>
> Furthemore, with a multi-tenant hadoop cluster, and data in memory, would
> spark support encrypting the data in memory with DataFrames.
>
> --
> Best Regards,
> Irfan Kabli
>
>


Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-24 Thread Koert Kuipers
Dataset/DataFrame has repartition (which can be used to partition by key)
and sortWithinPartitions.

see for example usage here:
https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala#L18

On Fri, Jun 23, 2017 at 5:43 PM, Keith Chapman 
wrote:

> Hi,
>
> I have code that does the following using RDDs,
>
> val outputPartitionCount = 300
> val part = new MyOwnPartitioner(outputPartitionCount)
> val finalRdd = myRdd.repartitionAndSortWithinPartitions(part)
>
> where myRdd is correctly formed as key, value pairs. I am looking convert
> this to use Dataset/Dataframe instead of RDDs, so my question is:
>
> Is there custom partitioning of Dataset/Dataframe implemented in Spark?
> Can I accomplish the partial sort using mapPartitions on the resulting
> partitioned Dataset/Dataframe?
>
> Any thoughts?
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>


Re: isin query

2017-04-17 Thread Koert Kuipers
i dont see this behavior in the current spark master:

scala> val df = Seq("m_123", "m_111", "m_145", "m_098",
"m_666").toDF("msrid")
df: org.apache.spark.sql.DataFrame = [msrid: string]

scala> df.filter($"msrid".isin("m_123")).count
res0: Long =
1

scala> df.filter($"msrid".isin("m_123","m_111","m_145")).count
res1: Long = 3



On Mon, Apr 17, 2017 at 10:50 AM, nayan sharma 
wrote:

> Thanks for responding.
> df.filter($”msrid”===“m_123” || $”msrid”===“m_111”)
>
> there are lots of workaround to my question but Can you let know whats
> wrong with the “isin” query.
>
> Regards,
> Nayan
>
> Begin forwarded message:
>
> *From: *ayan guha 
> *Subject: **Re: isin query*
> *Date: *17 April 2017 at 8:13:24 PM IST
> *To: *nayan sharma , user@spark.apache.org
>
> How about using OR operator in filter?
>
> On Tue, 18 Apr 2017 at 12:35 am, nayan sharma 
> wrote:
>
>> Dataframe (df) having column msrid(String) having values
>> m_123,m_111,m_145,m_098,m_666
>>
>> I wanted to filter out rows which are having values m_123,m_111,m_145
>>
>> df.filter($"msrid".isin("m_123","m_111","m_145")).count
>> count =0
>> while
>> df.filter($"msrid".isin("m_123")).count
>> count=121212
>> I have tried using queries like
>> df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
>> count =0
>> but
>>
>> df.filter($"msrid" isin (List("m_123"):_*))
>> count=121212
>>
>> Any suggestion will do a great help to me.
>>
>> Thanks,
>> Nayan
>>
> --
> Best Regards,
> Ayan Guha
>
>
>


NPE in UDF yet no nulls in data because analyzer runs test with nulls

2017-04-14 Thread Koert Kuipers
we were running in to an NPE in one of our UDFs for spark sql.

now this particular function indeed could not handle nulls, but this was by
design since null input was never allowed (and we would want it to blow up
if there was a null as input).

we realized the issue was not in our data when we added filters for nulls
and the NPE still happened. then we also saw the NPE when just doing
dataframe.explain instead of running our job.

turns out the issue is in EliminateOuterJoin.canFilterOutNull where a row
with all nulls ifs fed into the expression as a test. its the line:
val v = boundE.eval(emptyRow)

so should we conclude from this that all udfs should always be prepared to
handle nulls?


Re: Why dataframe can be more efficient than dataset?

2017-04-09 Thread Koert Kuipers
in this case there is no difference in performance. both will do the
operation directly on the internal representation of the data (so the
InternalRow).

also it is worth pointing out that switching back and forth between
Dataset[X] and DataFrame is free.

On Sun, Apr 9, 2017 at 1:28 PM, Shiyuan <gshy2...@gmail.com> wrote:

> Thank you for the detailed explanation!  You point out two reasons why
> Dataset is not as efficeint as dataframe:
> 1). Spark cannot look into lambda and therefore cannot optimize.
> 2). The  type conversion  occurs under the hood, eg. from X to internal
> row.
>
> Just to check my understanding,  some method of Dataset can also take sql
> expression string  instead of lambda function, in this case, Is it  the
> type conversion still happens under the hood and therefore Dataset is still
> not as efficient as DataFrame.  Here is the code,
>
> //define a dataset and a dataframe, same content, but one is stored as
> Dataset, the other is Dataset
> scala> case class Person(name: String, age: Long)
> scala> val ds = Seq(Person("A",32), Person("B", 18)).toDS
> ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
> scala> val df = Seq(Person("A",32), Person("B", 18)).toDF
> df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
>
> //Which filtering is more efficient? both use sql expression string.
> scala> df.filter("age < 20")
> res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name:
> string, age: bigint]
>
> scala> ds.filter("age < 20")
> res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
>
>
>
>
>
>
>
>
> On Sat, Apr 8, 2017 at 7:22 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> how would you use only relational transformations on dataset?
>>
>> On Sat, Apr 8, 2017 at 2:15 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>
>>> Hi Spark-users,
>>> I came across a few sources which mentioned DataFrame can be more
>>> efficient than Dataset.  I can understand this is true because Dataset
>>> allows functional transformation which Catalyst cannot look into and hence
>>> cannot optimize well. But can DataFrame be more efficient than Dataset even
>>> if we only use the relational transformation on dataset? If so, can anyone
>>> give some explanation why  it is so? Any benchmark comparing dataset vs.
>>> dataframe?   Thank you!
>>>
>>> Shiyuan
>>>
>>
>>
>


Re: Why dataframe can be more efficient than dataset?

2017-04-08 Thread Koert Kuipers
how would you use only relational transformations on dataset?

On Sat, Apr 8, 2017 at 2:15 PM, Shiyuan  wrote:

> Hi Spark-users,
> I came across a few sources which mentioned DataFrame can be more
> efficient than Dataset.  I can understand this is true because Dataset
> allows functional transformation which Catalyst cannot look into and hence
> cannot optimize well. But can DataFrame be more efficient than Dataset even
> if we only use the relational transformation on dataset? If so, can anyone
> give some explanation why  it is so? Any benchmark comparing dataset vs.
> dataframe?   Thank you!
>
> Shiyuan
>


Re: Why dataframe can be more efficient than dataset?

2017-04-08 Thread Koert Kuipers
let me try that again. i left some crap at the bottom of my previous email
as i was editing it. sorry about that. here it goes:

it is because you use Dataset[X] but the actual computations are still done
in Dataset[Row] (so DataFrame). well... the actual computations are done in
RDD[InternalRow] with spark's internal types to represent String, Map, Seq,
structs, etc.

so for example if you do:
scala> val x: Dataset[(String, String)] = ...
scala> val f: (String, String) => Boolean = _._2 != null
scala> x.filter(f)

in this case you are using a lambda function for the filter. this is a
black-box operation to spark (spark cannot see what is inside the
function). so spark will now convert the internal representation it is
actually using (something like an InternalRow of size 2 with inside of it
two objects of type UTF8String) into a Tuple2[String, String], and then
call your function f on it. so for this very simply null comparison you are
doing a relatively expensive conversion.

now compare this to if you have a DataFrame that holds 2 columns of type
String.
scala> val x: DataFrame = ...
x: org.apache.spark.sql.DataFrame = [x: string, y: string]
scala> x.filter($"y" isNotNull)

spark will parse your expression, and since it has an understanding of what
you are trying to do, it can apply the logic directly on the InternalRow,
which avoids the conversion. this will be faster. of course you pay the
price for this in that you are forced to use a much more constrained
framework to express what you want to do, which can lead to some hair
pulling at times.

On Sat, Apr 8, 2017 at 2:15 PM, Shiyuan  wrote:

> Hi Spark-users,
> I came across a few sources which mentioned DataFrame can be more
> efficient than Dataset.  I can understand this is true because Dataset
> allows functional transformation which Catalyst cannot look into and hence
> cannot optimize well. But can DataFrame be more efficient than Dataset even
> if we only use the relational transformation on dataset? If so, can anyone
> give some explanation why  it is so? Any benchmark comparing dataset vs.
> dataframe?   Thank you!
>
> Shiyuan
>


Re: Why dataframe can be more efficient than dataset?

2017-04-08 Thread Koert Kuipers
it is because you use Dataset[X] but the actual computations are still done
in Dataset[Row] (so DataFrame). well... the actual computations are done in
RDD[InternalRow] with spark's internal types to represent String, Map, Seq,
structs, etc.

so for example if you do:
scala> val x: Dataset[(String, String)] = ...
scala> val f: (String, String) => Boolean = _._2 != null
scala> x.filter(f)

in this case you are using a lambda function for the filter. this is a
black-box operation to spark (spark cannot see what is inside the
function). so spark will now convert the internal representation it is
actually using (something like an InternalRow of size 2 with inside of it
two objects of type UTF8String) into a Tuple2[String, String], and then
call your function f on it. so for this very simply null comparison you are
doing a relatively expensive conversion.

now compare this to if you have a DataFrame that holds 2 columns of type
String.
scala> val x: DataFrame = ...
x: org.apache.spark.sql.DataFrame = [x: string, y: string]
scala> x.filter($"y" isNotNull)

spark will parse your expression, and since it has an understanding of what
you are trying to do, it can apply the logic directly on the InternalRow,
which avoids the conversion. this will be faster. of course you pay the
price for this in that you are forced to use a much more constrained
framework to express what you want to do, which can lead to some hair
pulling at times.



so when you do a lambda operation on type X, this is black

want to use X spark needs to convert these InternalRows to X and then
convert the result back to InternalRows.



 (so DataFrame) using spark's internal types for string, seq, map, etc.
so any time you actually need an X there is conversion from Row to X, and
from internal representations to your representations of the data) and back
going on. this is whats the encoders are used for.

2) some optimizations aren't working yet for Dataset[X]
3) since type X and the lambdas that you define that perform on it are
somewhat of a black box to spark there is less room for optimization.



On Sat, Apr 8, 2017 at 2:15 PM, Shiyuan  wrote:

> Hi Spark-users,
> I came across a few sources which mentioned DataFrame can be more
> efficient than Dataset.  I can understand this is true because Dataset
> allows functional transformation which Catalyst cannot look into and hence
> cannot optimize well. But can DataFrame be more efficient than Dataset even
> if we only use the relational transformation on dataset? If so, can anyone
> give some explanation why  it is so? Any benchmark comparing dataset vs.
> dataframe?   Thank you!
>
> Shiyuan
>


Re: how do i force unit test to do whole stage codegen

2017-04-05 Thread Koert Kuipers
its pretty much impossible to be fully up to date with spark given how fast
it moves!

the book is a very helpful reference

On Wed, Apr 5, 2017 at 11:15 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> I'm very sorry for not being up to date with the current style (and
> "promoting" the old style) and am going to review that part soon. I'm very
> close to touch it again since I'm with Optimizer these days.
>
> Jacek
>
> On 5 Apr 2017 6:08 a.m., "Kazuaki Ishizaki" <ishiz...@jp.ibm.com> wrote:
>
>> Hi,
>> The page in the URL explains the old style of physical plan output.
>> The current style adds "*" as a prefix of each operation that the
>> whole-stage codegen can be apply to.
>>
>> So, in your test case, whole-stage codegen has been already enabled!!
>>
>> FYI. I think that it is a good topic for d...@spark.apache.org.
>>
>> Kazuaki Ishizaki
>>
>>
>>
>> From:Koert Kuipers <ko...@tresata.com>
>> To:"user@spark.apache.org" <user@spark.apache.org>
>> Date:2017/04/05 05:12
>> Subject:how do i force unit test to do whole stage codegen
>> --
>>
>>
>>
>> i wrote my own expression with eval and doGenCode, but doGenCode never
>> gets called in tests.
>>
>> also as a test i ran this in a unit test:
>> spark.range(10).select('id as 'asId).where('id === 4).explain
>> according to
>>
>> *https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html*
>> <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html>
>> this is supposed to show:
>> == Physical Plan ==
>> WholeStageCodegen
>> :  +- Project [id#0L AS asId#3L]
>> : +- Filter (id#0L = 4)
>> :+- Range 0, 1, 8, 10, [id#0L]
>>
>> but it doesn't. instead it shows:
>>
>> == Physical Plan ==
>> *Project [id#12L AS asId#15L]
>> +- *Filter (id#12L = 4)
>>   +- *Range (0, 10, step=1, splits=Some(4))
>>
>> so i am again missing the WholeStageCodegen. any idea why?
>>
>> i create spark session for unit tests simply as:
>> val session = SparkSession.builder
>>  .master("local[*]")
>>  .appName("test")
>>  .config("spark.sql.shuffle.partitions", 4)
>>  .getOrCreate()
>>
>>
>>


Re: how do i force unit test to do whole stage codegen

2017-04-04 Thread Koert Kuipers
got it. thats good to know. thanks!

On Wed, Apr 5, 2017 at 12:07 AM, Kazuaki Ishizaki <ishiz...@jp.ibm.com>
wrote:

> Hi,
> The page in the URL explains the old style of physical plan output.
> The current style adds "*" as a prefix of each operation that the
> whole-stage codegen can be apply to.
>
> So, in your test case, whole-stage codegen has been already enabled!!
>
> FYI. I think that it is a good topic for d...@spark.apache.org.
>
> Kazuaki Ishizaki
>
>
>
> From:Koert Kuipers <ko...@tresata.com>
> To:"user@spark.apache.org" <user@spark.apache.org>
> Date:2017/04/05 05:12
> Subject:how do i force unit test to do whole stage codegen
> --
>
>
>
> i wrote my own expression with eval and doGenCode, but doGenCode never
> gets called in tests.
>
> also as a test i ran this in a unit test:
> spark.range(10).select('id as 'asId).where('id === 4).explain
> according to
>
> *https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html*
> <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html>
> this is supposed to show:
> == Physical Plan ==
> WholeStageCodegen
> :  +- Project [id#0L AS asId#3L]
> : +- Filter (id#0L = 4)
> :+- Range 0, 1, 8, 10, [id#0L]
>
> but it doesn't. instead it shows:
>
> == Physical Plan ==
> *Project [id#12L AS asId#15L]
> +- *Filter (id#12L = 4)
>   +- *Range (0, 10, step=1, splits=Some(4))
>
> so i am again missing the WholeStageCodegen. any idea why?
>
> i create spark session for unit tests simply as:
> val session = SparkSession.builder
>  .master("local[*]")
>  .appName("test")
>  .config("spark.sql.shuffle.partitions", 4)
>  .getOrCreate()
>
>
>


how do i force unit test to do whole stage codegen

2017-04-04 Thread Koert Kuipers
i wrote my own expression with eval and doGenCode, but doGenCode never gets
called in tests.

also as a test i ran this in a unit test:
spark.range(10).select('id as 'asId).where('id === 4).explain
according to
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-whole-stage-codegen.html
this is supposed to show:

== Physical Plan ==WholeStageCodegen
:  +- Project [id#0L AS asId#3L]
: +- Filter (id#0L = 4)
:+- Range 0, 1, 8, 10, [id#0L]

but it doesn't. instead it shows:

== Physical Plan ==
*Project [id#12L AS asId#15L]
+- *Filter (id#12L = 4)
   +- *Range (0, 10, step=1, splits=Some(4))

so i am again missing the WholeStageCodegen. any idea why?

i create spark session for unit tests simply as:
val session = SparkSession.builder
  .master("local[*]")
  .appName("test")
  .config("spark.sql.shuffle.partitions", 4)
  .getOrCreate()


map transform on array in spark sql

2017-04-03 Thread Koert Kuipers
i have a DataFrame where one column has type:

ArrayType(StructType(Seq(
  StructField("a", typeA, nullableA),
  StructField("b", typeB, nullableB)
)))

i would like to map over this array to pick the first element in the
struct. so the result should be a ArrayType(typeA, nullableA). i realize i
can do this with a scala udf if i know typeA. but what if i dont know typeA?

basically i would like to do an expression like:
map(col("x"), _(0)))

any suggestions?


Re: Why VectorUDT private?

2017-03-30 Thread Koert Kuipers
sorry meant to say:
we know when we upgrade that we might run into minor inconveniences that
are completely our own doing/fault.

also, with yarn it has become really easy to run against an exact spark
version of our choosing, since there is no longer such a thing as a
centrally managed spark distro on the cluster.

On Thu, Mar 30, 2017 at 1:34 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i agree with that.
>
> we work within that assumption. we compile and run against a single exact
> spark version. we know when we upgrade that we might run into minor
> inconveniences that our completely our own doing/fault. the trade off has
> been totally worth it to me.
>
>
> On Thu, Mar 30, 2017 at 1:20 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I think really the right way to think about things that are marked
>> private is, "this may disappear or change in a future minor release".  If
>> you are okay with that, working about the visibility restrictions is
>> reasonable.
>>
>> On Thu, Mar 30, 2017 at 5:52 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> I stopped asking long time ago why things are private in spark... I
>>> mean... The conversion between ml and mllib vectors is private... the
>>> conversion between spark vector and breeze used to be (or still is?)
>>> private. it just goes on. Lots of useful stuff is private[SQL].
>>>
>>> Luckily there are simple ways to get around these visibility restrictions
>>>
>>> On Mar 29, 2017 22:57, "Ryan" <ryan.hd@gmail.com> wrote:
>>>
>>>> I'm writing a transformer and the input column is vector type(which is
>>>> the output column from other transformer). But as the VectorUDT is private,
>>>> how could I check/transform schema for the vector column?
>>>>
>>>
>>
>


Re: Why VectorUDT private?

2017-03-30 Thread Koert Kuipers
i agree with that.

we work within that assumption. we compile and run against a single exact
spark version. we know when we upgrade that we might run into minor
inconveniences that our completely our own doing/fault. the trade off has
been totally worth it to me.


On Thu, Mar 30, 2017 at 1:20 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I think really the right way to think about things that are marked private
> is, "this may disappear or change in a future minor release".  If you are
> okay with that, working about the visibility restrictions is reasonable.
>
> On Thu, Mar 30, 2017 at 5:52 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> I stopped asking long time ago why things are private in spark... I
>> mean... The conversion between ml and mllib vectors is private... the
>> conversion between spark vector and breeze used to be (or still is?)
>> private. it just goes on. Lots of useful stuff is private[SQL].
>>
>> Luckily there are simple ways to get around these visibility restrictions
>>
>> On Mar 29, 2017 22:57, "Ryan" <ryan.hd@gmail.com> wrote:
>>
>>> I'm writing a transformer and the input column is vector type(which is
>>> the output column from other transformer). But as the VectorUDT is private,
>>> how could I check/transform schema for the vector column?
>>>
>>
>


Re: Why VectorUDT private?

2017-03-30 Thread Koert Kuipers
I stopped asking long time ago why things are private in spark... I mean...
The conversion between ml and mllib vectors is private... the conversion
between spark vector and breeze used to be (or still is?) private. it just
goes on. Lots of useful stuff is private[SQL].

Luckily there are simple ways to get around these visibility restrictions

On Mar 29, 2017 22:57, "Ryan"  wrote:

> I'm writing a transformer and the input column is vector type(which is the
> output column from other transformer). But as the VectorUDT is private, how
> could I check/transform schema for the vector column?
>


Re: Easily creating custom encoders

2017-03-21 Thread Koert Kuipers
see:
https://issues.apache.org/jira/browse/SPARK-18122

On Tue, Mar 21, 2017 at 1:13 PM, Ashic Mahtab  wrote:

> I'm trying to easily create custom encoders for case classes having
> "unfriendly" fields. I could just kryo the whole thing, but would like to
> at least have a few fields in the schema instead of one binary blob. For
> example,
>
>
> case class MyClass(id: UUID, items: Map[String, Double], name: String)
>
>
> Is there a way to create an Encoder[MyClass] by kryo-ing the things that
> don't work, and not the ones that do, while retaining distinct columns? In
> the previous example, I'd at least want (binary, binary, String).
>
>
> Is this possible?
>
>
> -Ashic.
>


Re: Fast write datastore...

2017-03-15 Thread Koert Kuipers
we are using elasticsearch for this.

the issue of elasticsearch falling over if the number of partitions/cores
in spark writing to it is too high does suck indeed. and the answer every
time i asked about it on elasticsearch mailing list has been to reduce
spark tasks or increase elasticsearch nodes, which is not very useful.

we ended up putting the spark jobs that write to elasticsearch on a yarn
queue that limits cores. not ideal but it does the job.

On Wed, Mar 15, 2017 at 2:04 AM, muthu  wrote:

> Hello there,
>
> I have one or more parquet files to read and perform some aggregate queries
> using Spark Dataframe. I would like to find a reasonable fast datastore
> that
> allows me to write the results for subsequent (simpler queries).
> I did attempt to use ElasticSearch to write the query results using
> ElasticSearch Hadoop connector. But I am running into connector write
> issues
> if the number of Spark executors are too many for ElasticSearch to handle.
> But in the schema sense, this seems a great fit as ElasticSearch has smartz
> in place to discover the schema. Also in the query sense, I can perform
> simple filters and sort using ElasticSearch and for more complex aggregate,
> Spark Dataframe can come back to the rescue :).
> Please advice on other possible data-stores I could use?
>
> Thanks,
> Muthu
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Fast-write-datastore-tp28497.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: can spark take advantage of ordered data?

2017-03-10 Thread Koert Kuipers
this shouldn't be too hard. adding something to spark-sorted or to the
dataframe/dataset logical plan that says "trust me, i am already
partitioned and sorted" seems doable. however you most likely need a custom
hash partitioner, and you have to be careful to read the data in without
file splitting.

On Mar 10, 2017 9:10 AM, "sourabh chaki"  wrote:

> My use case is also quite similar. I have 2 feeds. One 3TB and another
> 100GB. Both the feeds are generated by hadoop reduce operation and
> partitioned by hadoop hashpartitioner. 3TB feed has 10K partitions whereas
> 100GB file has 200 partitions.
>
> Now when I do a join between these two feeds using spark, spark shuffles
> both the RDDS and it takes long time to complete. Can we do something so
> that spark can recognise the existing partitions of 3TB feed and shuffles
> only 200GB feed?
> It can be mapside scan for bigger RDD and shuffle read from smaller RDD?
>
> I have looked at spark-sorted project, but that project does not utilise
> the pre-existing partitions in the feed.
> Any pointer will be helpful.
>
> Thanks
> Sourabh
>
> On Thu, Mar 12, 2015 at 6:35 AM, Imran Rashid 
> wrote:
>
>> Hi Jonathan,
>>
>> you might be interested in https://issues.apache.org/j
>> ira/browse/SPARK-3655 (not yet available) and https://github.com/tresata
>> /spark-sorted (not part of spark, but it is available right now).
>> Hopefully thats what you are looking for.  To the best of my knowledge that
>> covers what is available now / what is being worked on.
>>
>> Imran
>>
>> On Wed, Mar 11, 2015 at 4:38 PM, Jonathan Coveney 
>> wrote:
>>
>>> Hello all,
>>>
>>> I am wondering if spark already has support for optimizations on sorted
>>> data and/or if such support could be added (I am comfortable dropping to a
>>> lower level if necessary to implement this, but I'm not sure if it is
>>> possible at all).
>>>
>>> Context: we have a number of data sets which are essentially already
>>> sorted on a key. With our current systems, we can take advantage of this to
>>> do a lot of analysis in a very efficient fashion...merges and joins, for
>>> example, can be done very efficiently, as can folds on a secondary key and
>>> so on.
>>>
>>> I was wondering if spark would be a fit for implementing these sorts of
>>> optimizations? Obviously it is sort of a niche case, but would this be
>>> achievable? Any pointers on where I should look?
>>>
>>
>>
>


Re: finding Spark Master

2017-03-07 Thread Koert Kuipers
assuming this is running on yarn there is really spark-master. every job
created its own "master" within a yarn application.

On Tue, Mar 7, 2017 at 6:27 PM, Adaryl Wakefield <
adaryl.wakefi...@hotmail.com> wrote:

> I’m running a three node cluster along with Spark along with Hadoop as
> part of a HDP stack. How do I find my Spark Master? I’m just seeing the
> clients. I’m trying to figure out what goes in setMaster() aside from
> local[*].
>
>
>
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685 <(913)%20938-6685>
>
> www.massstreet.net
>
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData
>
>
>


Re: Spark join over sorted columns of dataset.

2017-03-03 Thread Koert Kuipers
For RDD the shuffle is already skipped but the sort is not. In spark-sorted
we track partitioning and sorting within partitions for key-value RDDs and
can avoid the sort. See:
https://github.com/tresata/spark-sorted

For Dataset/DataFrame such optimizations are done automatically, however
it's currently not always working for Dataset, see:
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-19468

On Mar 3, 2017 11:06 AM, "Rohit Verma"  wrote:

Sending it to dev’s.
Can you please help me providing some ideas for below.

Regards
Rohit
> On Feb 23, 2017, at 3:47 PM, Rohit Verma 
wrote:
>
> Hi
>
> While joining two columns of different dataset, how to optimize join if
both the columns are pre sorted within the dataset.
> So that when spark do sort merge join the sorting phase can skipped.
>
> Regards
> Rohit


Re: Spark runs out of memory with small file

2017-02-26 Thread Koert Kuipers
using wholeFiles to process formats that can not be split per line is not
"old"

and there are plenty of problems for which RDD is still better suited than
Dataset or DataFrame currently (this might change in near future when
Dataset gets some crucial optimizations fixed).

On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta 
wrote:

> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay 
> wrote:
>
>> The file is so small that a stand alone python script, independent of
>> spark, can process the file in under a second.
>>
>> Also, the following fails:
>>
>> 1. Read the whole file in with wholeFiles
>>
>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>> line="line")
>>
>> 3. Save the results as CVS to HDFS
>>
>> 4. Read the files (there are 20) from HDFS into a df using
>> sqlContext.read.csv()
>>
>> 5. Convert the df to an rdd.
>>
>> 6 Create key value pairs with the key being the file path and the value
>> being the line.
>>
>> 7 Iterate through values
>>
>> What happens is Spark either runs out of memory, or, in my last try with
>> a slight variation, just hangs for 12 hours.
>>
>> Henry
>>
>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>>
>> Hi, Tremblay.
>> Your file is .gz format, which is not splittable for hadoop. Perhaps the
>> file is loaded by only one executor.
>> How many executors do you start?
>> Perhaps repartition method could solve it, I guess.
>>
>>
>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
>> wrote:
>>
>>> I am reading in a single small file from hadoop with wholeText. If I
>>> process each line and create a row with two cells, the first cell equal to
>>> the name of the file, the second cell equal to the line. That code runs
>>> fine.
>>>
>>> But if I just add two line of code and change the first cell based on
>>> parsing a line, spark runs out of memory. Any idea why such a simple
>>> process that would succeed quickly in a non spark application fails?
>>>
>>> Thanks!
>>>
>>> Henry
>>>
>>> CODE:
>>>
>>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>>> nternal.warc.gz
>>>
>>>
>>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>> In [2]: rdd1.count()
>>> Out[2]: 1
>>>
>>>
>>> In [4]: def process_file(s):
>>>...: text = s[1]
>>>...: the_id = s[0]
>>>...: d = {}
>>>...: l =  text.split("\n")
>>>...: final = []
>>>...: for line in l:
>>>...: d[the_id] = line
>>>...: final.append(Row(**d))
>>>...: return final
>>>...:
>>>
>>> In [5]: rdd2 = rdd1.map(process_file)
>>>
>>> In [6]: rdd2.count()
>>> Out[6]: 1
>>>
>>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>
>>> In [8]: rdd3.count()
>>> Out[8]: 508310
>>>
>>> In [9]: rdd3.take(1)
>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>> ternal.warc.gz='WARC/1.0\r')]
>>>
>>> In [10]: def process_file(s):
>>> ...: text = s[1]
>>> ...: d = {}
>>> ...: l =  text.split("\n")
>>> ...: final = []
>>> ...: the_id = "init"
>>> ...: for line in l:
>>> ...: if line[0:15] == 'WARC-Record-ID:':
>>> ...: the_id = line[15:]
>>> ...: d[the_id] = line
>>> ...: final.append(Row(**d))
>>> ...: return final
>>>
>>> In [12]: rdd2 = rdd1.map(process_file)
>>>
>>> In [13]: rdd2.count()
>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
>>> Consider boosting spark.yarn.executor.memoryOverhead.
>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>>> head.
>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID
>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>>> 10.3 GB physical memory used. Consider boosting
>>> 

Re: Turning rows into columns

2017-02-05 Thread Koert Kuipers
since there is no key to group by and assemble records i would suggest to
write this in RDD land and then convert to data frame. you can use
sc.wholeTextFiles to process text files and create a state machine

On Feb 4, 2017 16:25, "Paul Tremblay"  wrote:

I am using pyspark 2.1 and am wondering how to convert a flat file, with
one record per row, into a columnar format.

Here is an example of the data:

u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
 u'WARC-Filename: CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.
warc.gz',
 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
 u'conformsTo: http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_
latestdraft.pdf',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/',
 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru',
 u'Accept-Encoding: x-gzip, gzip, deflate',
 u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)',
 u'Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: ',
 u'WARC-Concurrent-To: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/',
 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to convert it to something like:
{warc-type='request',warc-date='2016-12-02'. ward-record-id='

Re: Spark 2 + Java + UDF + unknown return type...

2017-02-05 Thread Koert Kuipers
A UDF that does not return a single type is not supported. and spark has no
concept of union types.



On Feb 2, 2017 16:05, "Jean Georges Perrin"  wrote:

Hi fellow Sparkans,

I am building a UDF (in Java) that can return various data types, basically
the signature of the function itself is:

public Object call(String a, Object b, String c, Object d, String e) throws
Exception

When I register my function, I need to provide a type, e.g.:

spark.udf().register("f2", new Udf5(), DataTypes.LongType);

In my test it is a long now, but can become a string or a float. Of course,
I do not know the expected return type before I call the function, which I
call like:

df = df.selectExpr("*", "f2('x1', x, 'c2', y, 'op') as op");

Is there a way to have an Object being returned from a UDF and to store an
Object in a Dataset/dataframe? I don't need to know the datatype at that
point and can leave it hanging for now? Or should I play it safe and always
return a DataTypes.StringType (and then try to transform it if needed)?

I hope I am clear enough :).

Thanks for any tip/idea/comment...

jg


Re: How to checkpoint and RDD after a stage and before reaching an action?

2017-02-04 Thread Koert Kuipers
this is a general problem with checkpoint, one of the least understood
operations i think.

checkpoint is lazy (meaning it doesnt start until there is an action) and
asynchronous (meaning when it does start it is its own computation). so
basically with a checkpoint the rdd always gets computed twice.

i think the only useful pattern for checkpoint is to always persist/cache
right before the checkpoint. so:
rdd.persist(...).checkpoint()

On Sat, Feb 4, 2017 at 4:11 AM, leo9r  wrote:

> Hi,
>
> I have a 1-action job (saveAsObjectFile at the end), that includes several
> stages. One of those stages is an expensive join "rdd1.join(rdd2)". I would
> like to checkpoint rdd1 right before the join to improve the stability of
> the job. However, what I'm seeing is that the job gets executed all the way
> to the end (saveAsObjectFile) without doing any checkpointing, and then
> re-runing the computation to checkpoint rdd1 (when I see the files saved to
> the checkpoint directory). I have no issue with recomputing, given that I'm
> not caching rdd1, but the fact that the checkpointing of rdd1 happens after
> the join brings no benefit because the whole DAG is executed in one piece
> and the job fails. If that is actually what is happening, what would be the
> best approach to solve this?
> What I'm currently doing is to manually save rdd1 to HDFS right after the
> filter in line (4) and then load it back right before the join in line
> (11).
> That prevents the job from failing by splitting it into 2 jobs (ie. 2
> actions). My expectations was that rdd1.checkpoint in line (8) was going to
> have the same effect but without the hassle of manually saving and loading
> intermediate files.
>
> ///
>
> (1)   val rdd1 = loadData1
> (2) .map
> (3) .groupByKey
> (4) .filter
> (5)
> (6)   val rdd2 = loadData2
> (7)
> (8)   rdd1.checkpoint()
> (9)
> (10)  rdd1
> (11).join(rdd2)
> (12).saveAsObjectFile(...)
>
> /
>
> Thanks in advance,
> Leo
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/How-to-checkpoint-
> and-RDD-after-a-stage-and-before-reaching-an-action-tp20852.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


dataset algos slow because of too many shuffles

2017-02-02 Thread Koert Kuipers
we noticed that some algos we ported from rdd to dataset are significantly
slower, and the main reason seems to be more shuffles that we successfully
avoid for rdds by careful partitioning. this seems to be dataset specific
as it works ok for dataframe.

see also here:
http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/

it kind of boils down to this... if i partition and sort dataframes that
get used for joins repeatedly i can avoid shuffles:

System.setProperty("spark.sql.autoBroadcastJoinThreshold", "-1")

val df1 = Seq((0, 0), (1, 1)).toDF("key", "value")

.repartition(col("key")).sortWithinPartitions(col("key")).persist(StorageLevel.DISK_ONLY)
val df2 = Seq((0, 0), (1, 1)).toDF("key2", "value2")

.repartition(col("key2")).sortWithinPartitions(col("key2")).persist(StorageLevel.DISK_ONLY)

val joined = df1.join(df2, col("key") === col("key2"))
joined.explain

== Physical Plan ==
*SortMergeJoin [key#5], [key2#27], Inner
:- InMemoryTableScan [key#5, value#6]
: +- InMemoryRelation [key#5, value#6], true, 1, StorageLevel(disk,
1 replicas)
:   +- *Sort [key#5 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(key#5, 4)
: +- LocalTableScan [key#5, value#6]
+- InMemoryTableScan [key2#27, value2#28]
  +- InMemoryRelation [key2#27, value2#28], true, 1,
StorageLevel(disk, 1 replicas)
+- *Sort [key2#27 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key2#27, 4)
  +- LocalTableScan [key2#27, value2#28]

notice how the persisted dataframes are not shuffled or sorted anymore
before being used in the join. however if i try to do the same with dataset
i have no luck:

val ds1 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
val ds2 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)

val joined1 = ds1.joinWith(ds2, ds1("_1") === ds2("_1"))
joined1.explain

== Physical Plan ==
*SortMergeJoin [_1#105._1], [_2#106._1], Inner
:- *Sort [_1#105._1 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_1#105._1, 4)
: +- *Project [named_struct(_1, _1#83, _2, _2#84) AS _1#105]
:+- InMemoryTableScan [_1#83, _2#84]
:  +- InMemoryRelation [_1#83, _2#84], true, 1,
StorageLevel(disk, 1 replicas)
:+- *Sort [_1#83 ASC NULLS FIRST], false, 0
:   +- Exchange hashpartitioning(_1#83, 4)
:  +- LocalTableScan [_1#83, _2#84]
+- *Sort [_2#106._1 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_2#106._1, 4)
  +- *Project [named_struct(_1, _1#100, _2, _2#101) AS _2#106]
 +- InMemoryTableScan [_1#100, _2#101]
   +- InMemoryRelation [_1#100, _2#101], true, 1,
StorageLevel(disk, 1 replicas)
 +- *Sort [_1#83 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(_1#83, 4)
   +- LocalTableScan [_1#83, _2#84]

notice how my persisted Datasets are shuffled and sorted again. part of the
issue seems to be in joinWith, which does some preprocessing that seems to
confuse the planner. if i change the joinWith to join (which returns a
dataframe) it looks a little better in that only one side gets shuffled
again, but still not optimal:

val ds1 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)
val ds2 = Seq((0, 0), (1, 1)).toDS

.repartition(col("_1")).sortWithinPartitions(col("_1")).persist(StorageLevel.DISK_ONLY)

val joined1 = ds1.join(ds2, ds1("_1") === ds2("_1"))
joined1.explain

== Physical Plan ==
*SortMergeJoin [_1#83], [_1#100], Inner
:- InMemoryTableScan [_1#83, _2#84]
: +- InMemoryRelation [_1#83, _2#84], true, 1, StorageLevel(disk, 1
replicas)
:   +- *Sort [_1#83 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_1#83, 4)
: +- LocalTableScan [_1#83, _2#84]
+- *Sort [_1#100 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_1#100, 4)
  +- InMemoryTableScan [_1#100, _2#101]
+- InMemoryRelation [_1#100, _2#101], true, 1,
StorageLevel(disk, 1 replicas)
  +- *Sort [_1#83 ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(_1#83, 4)
+- LocalTableScan [_1#83, _2#84]


Re: frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
great its an easy fix. i will create jira and pullreq

On Thu, Feb 2, 2017 at 2:13 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> That might be reasonable.  At least I can't think of any problems with
> doing that.
>
> On Thu, Feb 2, 2017 at 7:39 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> since a dataset is a typed object you ideally don't have to think about
>> field names.
>>
>> however there are operations on Dataset that require you to provide a
>> Column, like for example joinWith (and joinWith returns a strongly typed
>> Dataset, not DataFrame). once you have to provide a Column you are back to
>> thinking in field names, and worrying about duplicate field names, which is
>> something that can easily happen in a Dataset without you realizing it.
>>
>> so under the hood Dataset has unique identifiers for every column, as in
>> dataset.queryExecution.logical.output, but these are expressions
>> (attributes) that i cannot turn back into columns since the constructors
>> for this are private in spark.
>>
>> so how about having Dataset.apply(i: Int): Column to allow me to pick
>> columns by position without having to worry about (duplicate) field names?
>> then i could do something like:
>>
>> dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)
>>
>
>


Re: frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
another example is if i have a Dataset[(K, V)] and i want to repartition it
by the key K. repartition requires a Column which means i am suddenly back
to worrying about duplicate field names. i would like to be able to say:

dataset.repartition(dataset(0))

On Thu, Feb 2, 2017 at 10:39 AM, Koert Kuipers <ko...@tresata.com> wrote:

> since a dataset is a typed object you ideally don't have to think about
> field names.
>
> however there are operations on Dataset that require you to provide a
> Column, like for example joinWith (and joinWith returns a strongly typed
> Dataset, not DataFrame). once you have to provide a Column you are back to
> thinking in field names, and worrying about duplicate field names, which is
> something that can easily happen in a Dataset without you realizing it.
>
> so under the hood Dataset has unique identifiers for every column, as in
> dataset.queryExecution.logical.output, but these are expressions
> (attributes) that i cannot turn back into columns since the constructors
> for this are private in spark.
>
> so how about having Dataset.apply(i: Int): Column to allow me to pick
> columns by position without having to worry about (duplicate) field names?
> then i could do something like:
>
> dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)
>


frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
since a dataset is a typed object you ideally don't have to think about
field names.

however there are operations on Dataset that require you to provide a
Column, like for example joinWith (and joinWith returns a strongly typed
Dataset, not DataFrame). once you have to provide a Column you are back to
thinking in field names, and worrying about duplicate field names, which is
something that can easily happen in a Dataset without you realizing it.

so under the hood Dataset has unique identifiers for every column, as in
dataset.queryExecution.logical.output, but these are expressions
(attributes) that i cannot turn back into columns since the constructors
for this are private in spark.

so how about having Dataset.apply(i: Int): Column to allow me to pick
columns by position without having to worry about (duplicate) field names?
then i could do something like:

dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)


Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Koert Kuipers
you can still use it as Dataset[Set[X]]. all transformations should work
correctly.

however dataset.schema will show binary type, and dataset.show will show
bytes (unfortunately).

for example:

scala> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
setEncoder: [X]=> org.apache.spark.sql.Encoder[Set[X]]

scala> val x = Seq(Set(1,2,3)).toDS
x: org.apache.spark.sql.Dataset[scala.collection.immutable.Set[Int]] =
[value: binary]

scala> x.map(_ + 4).collect
res17: Array[scala.collection.immutable.Set[Int]] = Array(Set(1, 2, 3, 4))

scala> x.show
++
|   value|
++
|[2A 01 03 02 02 0...|
++


scala> x.schema
res19: org.apache.spark.sql.types.StructType =
StructType(StructField(value,BinaryType,true))


On Wed, Feb 1, 2017 at 12:03 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Koert,
>
> Thanks for the tips. I tried to do that but the column's type is now
> Binary. Do I get the Set[X] back in the Dataset?
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> set is currently not supported. you can use kryo encoder. there is no
>> other work around that i know of.
>>
>> import org.apache.spark.sql.{ Encoder, Encoders }
>> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>>
>> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> I got an exception like the following, when I tried to implement a user
>>> defined aggregation function.
>>>
>>>  Exception in thread "main" java.lang.UnsupportedOperationException: No
>>> Encoder found for Set[(scala.Long, scala.Long)]
>>>
>>> The Set[(Long, Long)] is a field in the case class which is the output
>>> type for the aggregation.
>>>
>>> Is there a workaround for this?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


union of compatible types

2017-02-01 Thread Koert Kuipers
spark's onion/merging of compatible types seems kind of weak. it works on
basic types in the top level record, but it fails for nested records, maps,
arrays, etc.

are there any known workarounds or plans to improve this?

for example i get errors like this:
org.apache.spark.sql.AnalysisException: Union can only be performed on
tables with the compatible column types.
StructType(StructField(_1,StringType,true),
StructField(_2,IntegerType,false)) <>
StructType(StructField(_1,StringType,true), StructField(_2,LongType,false))
at the first column of the second table

some examples that do work:

scala> Seq(1, 2, 3).toDF union Seq(1L, 2L, 3L).toDF
res2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value:
bigint]

scala> Seq((1,"x"), (2,"x"), (3,"x")).toDF union Seq((1L,"x"), (2L,"x"),
(3L,"x")).toDF
res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_1: bigint,
_2: string]

what i would also expect to work but currently doesn't:

scala> Seq((Seq(1),"x"), (Seq(2),"x"), (Seq(3),"x")).toDF union
Seq((Seq(1L),"x"), (Seq(2L),"x"), (Seq(3L),"x")).toDF

scala> Seq((1,("x",1)), (2,("x",2)), (3,("x",3))).toDF union
Seq((1L,("x",1L)), (2L,("x",2L)), (3L,("x", 3L))).toDF


Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-01-31 Thread Koert Kuipers
set is currently not supported. you can use kryo encoder. there is no other
work around that i know of.

import org.apache.spark.sql.{ Encoder, Encoders }
implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]

On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam  wrote:

> Hi guys,
>
> I got an exception like the following, when I tried to implement a user
> defined aggregation function.
>
>  Exception in thread "main" java.lang.UnsupportedOperationException: No
> Encoder found for Set[(scala.Long, scala.Long)]
>
> The Set[(Long, Long)] is a field in the case class which is the output
> type for the aggregation.
>
> Is there a workaround for this?
>
> Best Regards,
>
> Jerry
>


Re: eager? in dataframe's checkpoint

2017-01-31 Thread Koert Kuipers
i thought RDD.checkpoint is async? checkpointData is indeed updated
synchronously, but checkpointData.isCheckpointed is false until the actual
checkpoint operation has completed. and until the actual checkpoint
operation is done any operation will be on the original rdd.

for example notice how below it prints "not yet materialized" 6 times,
instead of just 3 times if the count had operated on the checkpoint data.

scala> val x = sc.parallelize(1 to 3).map{ (i) => println("not yet
materialized"); i }
x: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at
:24

scala> x.checkpoint(); println("is checkpointed? " + x.isCheckpointed);
println("count " + x.count)
is checkpointed? false
not yet materialized
not yet materialized
not yet materialized
not yet materialized
not yet materialized
not yet materialized
count 3






On Tue, Jan 31, 2017 at 4:18 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hi Koert,
>
> When eager is true, we return you a new DataFrame that depends on the
> files written out to the checkpoint directory.
> All previous operations on the checkpointed DataFrame are gone forever.
> You basically start fresh. AFAIK, when eager is true, the method will not
> return until the DataFrame is completely checkpointed. If you look at the
> RDD.checkpoint implementation, the checkpoint location is updated
> synchronously therefore during the count, `isCheckpointed` will be true.
>
> Best,
> Burak
>
> On Tue, Jan 31, 2017 at 12:52 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i understand that checkpoint cuts the lineage, but i am not fully sure i
>> understand the role of eager.
>>
>> eager simply seems to materialize the rdd early with a count, right after
>> the rdd has been checkpointed. but why is that useful? rdd.checkpoint is
>> asynchronous, so when the rdd.count happens most likely rdd.isCheckpointed
>> will be false, and the count will be on the rdd before it was checkpointed.
>> what is the benefit of that?
>>
>>
>> On Thu, Jan 26, 2017 at 11:19 PM, Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> One of the goals of checkpointing is to cut the RDD lineage. Otherwise
>>> you run into StackOverflowExceptions. If you eagerly checkpoint, you
>>> basically cut the lineage there, and the next operations all depend on the
>>> checkpointed DataFrame. If you don't checkpoint, you continue to build the
>>> lineage, therefore while that lineage is being resolved, you may hit the
>>> StackOverflowException.
>>>
>>> HTH,
>>> Burak
>>>
>>> On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin <j...@jgp.net>
>>> wrote:
>>>
>>>> Hey Sparkers,
>>>>
>>>> Trying to understand the Dataframe's checkpoint (*not* in the context
>>>> of streaming) https://spark.apache.org/docs/latest/api/java/org
>>>> /apache/spark/sql/Dataset.html#checkpoint(boolean)
>>>>
>>>> What is the goal of the *eager* flag?
>>>>
>>>> Thanks!
>>>>
>>>> jg
>>>>
>>>
>>>
>>
>


Re: eager? in dataframe's checkpoint

2017-01-31 Thread Koert Kuipers
i understand that checkpoint cuts the lineage, but i am not fully sure i
understand the role of eager.

eager simply seems to materialize the rdd early with a count, right after
the rdd has been checkpointed. but why is that useful? rdd.checkpoint is
asynchronous, so when the rdd.count happens most likely rdd.isCheckpointed
will be false, and the count will be on the rdd before it was checkpointed.
what is the benefit of that?


On Thu, Jan 26, 2017 at 11:19 PM, Burak Yavuz  wrote:

> Hi,
>
> One of the goals of checkpointing is to cut the RDD lineage. Otherwise you
> run into StackOverflowExceptions. If you eagerly checkpoint, you basically
> cut the lineage there, and the next operations all depend on the
> checkpointed DataFrame. If you don't checkpoint, you continue to build the
> lineage, therefore while that lineage is being resolved, you may hit the
> StackOverflowException.
>
> HTH,
> Burak
>
> On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin  wrote:
>
>> Hey Sparkers,
>>
>> Trying to understand the Dataframe's checkpoint (*not* in the context of
>> streaming) https://spark.apache.org/docs/latest/api/java/
>> org/apache/spark/sql/Dataset.html#checkpoint(boolean)
>>
>> What is the goal of the *eager* flag?
>>
>> Thanks!
>>
>> jg
>>
>
>


Re: Spark 2.1.0 and Shapeless

2017-01-31 Thread Koert Kuipers
shading at the fat jar level can work, however it means that in your unit
tests where spark is a provided dependency you still can get errors because
spark is using an incompatible (newer) shapeless version. the unit tests
run with a single resolved shapeless after all.

for example spark ships with old guava but works fine with never guava. so
in our unit tests spark uses newer guava without issues, and for fat jar we
shade guava in sbt-assembly. it all works ok in this situation for us.

On Tue, Jan 31, 2017 at 3:06 PM, Phil Wills  wrote:

> Are you not able to shade it when you're building your fat jar with
> something like https://github.com/sbt/sbt-assembly#shading? I would have
> thought doing the shading at the app level would be a bit less painful than
> doing it at the library level.
>
> Phil
>
> On Tue, 31 Jan 2017, 04:24 Timothy Chan,  wrote:
>
>> I'm using a library, https://github.com/guardian/scanamo, that uses
>> shapeless 2.3.2. What are my options if I want to use this with Spark
>> 2.1.0?
>>
>> Based on this: http://apache-spark-developers-list.1001551.n3.
>> nabble.com/shapeless-in-spark-2-1-0-tt20392.html
>>
>> I'm guessing I would have to release my own version of scanamo with a
>> shaded shapeless?
>>
>


Re: Jars directory in Spark 2.0

2017-01-31 Thread Koert Kuipers
you basically have to keep your versions of dependencies in line with
sparks or shade your own dependencies.

you cannot just replace the jars in sparks jars folder. if you wan to
update them you have to build spark yourself with updated dependencies and
confirm it compiles, passes tests etc.

On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner 
wrote:

> Hey,
>
> While migrating to Spark 2.X from 1.6, I've had many issues with jars that
> come preloaded with Spark in the "jars/" directory and I had to shade most
> of my packages.
>
> Can I replace the jars in this folder to more up to date versions? Are
> those jar used for anything internal in Spark which means I can't blindly
> replace them?
>
>
>
> Thanks J
>
>
>
>
>
> *Sidney Feiner*   */*  SW Developer
>
> M: +972.528197720 <+972%2052-819-7720>  */*  Skype: sidney.feiner.startapp
>
>
>
> [image: StartApp] 
>
>
> [image: Meet Us at] 
>


Re: userClassPathFirst=true prevents SparkContext to be initialized

2017-01-30 Thread Koert Kuipers
i dont know why this is happening but i have given up on
userClassPath=first. i have seen many weird errors with it and consider it
broken.

On Jan 30, 2017 05:24, "Roberto Coluccio" 
wrote:

Hello folks,

I'm trying to work around an issue with some dependencies by trying to
specify at spark-submit time that I want my (user) classpath to be resolved
and taken into account first (against the jars received through the System
Classpath, which is /data/cloudera/parcels/CDH/jars/).

In order to accomplish this, I specify

--conf spark.driver.userClassPathFirst=true
--conf spark.executor.userClassPathFirst=true

and I pass my jars with

--jars 

in my spark-submit command, deploying in yarn cluster mode in a CDH 5.8
environment (Spark 1.6).

In the list passed with --jars I have severals deps, NOT including
hadoop/spark related ones. My app jar is not a fat (uber) one, thus it
includes only business classes. None of these ones has for any reasons a
"SparkConf.set("master", "local")", or anything like that.

Without specifying the userClassPathFirst configuration, my App is launched
and completed with no issues at all.

I tried to print logs down to the TRACE level with no luck. I get no
explicit errors and I verified adding the "-verbose:class" JVM arg that
Spark-related classes seem to be loaded with no issues. From a rapid
overview of loaded classes, it seems to me that a small fraction of classes
is loaded using userClassPathFirst=true w/r/t the default case. Eventually,
my driver's stderr gets stuck in logging out:

2017-01-30 10:10:22,308 INFO  ApplicationMaster:58 - Waiting for spark
context initialization ...
2017-01-30 10:10:32,310 INFO  ApplicationMaster:58 - Waiting for spark
context initialization ...
2017-01-30 10:10:42,311 INFO  ApplicationMaster:58 - Waiting for spark
context initialization ...

Dramatically, the application is then killed by YARN after a timeout.

In my understanding, quoting the doc (http://spark.apache.org/docs/
1.6.2/configuration.html):

[image: Inline image 1]

So I would expect the libs given through --jars options to be used first,
but I also expect no issues in loading the system classpath afterwards.
This is confirmed by the logs printed with the "-verbose:class" JVM option,
where I can see logs like:

[Loaded org.apache.spark.SparkContext from
file:/data/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/jars/spark-assembly-1.6.0-cdh5.8.0-hadoop2.6.0-cdh5.8.0.jar]


What am I missing here guys?

Thanks for your help.

Best regards,

Roberto


Re: Aggregator mutate b1 in place in merge

2017-01-29 Thread Koert Kuipers
thanks thats helpful

On Sun, Jan 29, 2017 at 12:54 PM, Anton Okolnychyi <
anton.okolnyc...@gmail.com> wrote:

> Hi,
>
> I recently extended the Spark SQL programming guide to cover user-defined
> aggregations, where I modified existing variables and returned them back in
> reduce and merge. This approach worked and it was approved by people who
> know the context.
>
> Hope that helps.
>
> 2017-01-29 17:17 GMT+01:00 Koert Kuipers <ko...@tresata.com>:
>
>> anyone?
>> it not i will follow the trail and try to deduce it myself
>>
>> On Mon, Jan 23, 2017 at 2:31 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> looking at the docs for org.apache.spark.sql.expressions.Aggregator it
>>> says for reduce method: "For performance, the function may modify `b` and
>>> return it instead of constructing new object for b.".
>>>
>>> it makes no such comment for the merge method.
>>>
>>> this is surprising to me because i know that for
>>> PairRDDFunctions.aggregateByKey mutation is allowed in both seqOp and
>>> combOp (which are the equivalents of reduce and merge in Aggregator).
>>>
>>> is it safe to mutate b1 and return it in Aggregator.merge?
>>>
>>>
>>
>


Re: Aggregator mutate b1 in place in merge

2017-01-29 Thread Koert Kuipers
anyone?
it not i will follow the trail and try to deduce it myself

On Mon, Jan 23, 2017 at 2:31 PM, Koert Kuipers <ko...@tresata.com> wrote:

> looking at the docs for org.apache.spark.sql.expressions.Aggregator it
> says for reduce method: "For performance, the function may modify `b` and
> return it instead of constructing new object for b.".
>
> it makes no such comment for the merge method.
>
> this is surprising to me because i know that for 
> PairRDDFunctions.aggregateByKey
> mutation is allowed in both seqOp and combOp (which are the equivalents of
> reduce and merge in Aggregator).
>
> is it safe to mutate b1 and return it in Aggregator.merge?
>
>


Re: kafka structured streaming source refuses to read

2017-01-28 Thread Koert Kuipers
there was also already an existing spark ticket for this:
SPARK-18779 <https://issues.apache.org/jira/browse/SPARK-18779>

On Sat, Jan 28, 2017 at 1:13 PM, Koert Kuipers <ko...@tresata.com> wrote:

> it seems the bug is:
> https://issues.apache.org/jira/browse/KAFKA-4547
>
> i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or
> 0.10.1.1
>
> On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> in case anyone else runs into this:
>>
>> the issue is that i was using kafka-clients 0.10.1.1
>>
>> it works when i use kafka-clients 0.10.0.1 with spark structured streaming
>>
>> my kafka server is 0.10.1.1
>>
>> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> i checked my topic. it has 5 partitions but all the data is written to a
>>> single partition: wikipedia-2
>>> i turned on debug logging and i see this:
>>>
>>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>>> wikipedia-1]. Seeking to the end.
>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>> partition wikipedia-0
>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>> partition wikipedia-4
>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>> partition wikipedia-3
>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>> partition wikipedia-2
>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>>> partition wikipedia-1
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-0 to latest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-0
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-0 to earliest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-0
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-4 to latest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-4
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-4 to earliest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-4
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-3 to latest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-3
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-3 to earliest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-3
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-2 to latest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=152908} for partition wikipedia-2
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-2 to earliest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-2
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>>> partition wikipedia-1 to latest offset.
>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>>> offset=0} for partition wikipedia-1
>>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>>
>>> what is confusing to me is this:
>>> Resetting offset for partition wikipedia-2 to latest offset.
>>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>>> 0, wikipedia-2 -> 0, wik

Re: kafka structured streaming source refuses to read

2017-01-28 Thread Koert Kuipers
it seems the bug is:
https://issues.apache.org/jira/browse/KAFKA-4547

i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or
0.10.1.1

On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <ko...@tresata.com> wrote:

> in case anyone else runs into this:
>
> the issue is that i was using kafka-clients 0.10.1.1
>
> it works when i use kafka-clients 0.10.0.1 with spark structured streaming
>
> my kafka server is 0.10.1.1
>
> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i checked my topic. it has 5 partitions but all the data is written to a
>> single partition: wikipedia-2
>> i turned on debug logging and i see this:
>>
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>> wikipedia-1]. Seeking to the end.
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=152908} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-1 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> what is confusing to me is this:
>> Resetting offset for partition wikipedia-2 to latest offset.
>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> why does it find latest offset 152908 for wikipedia-2 but then sets
>> latest offset to 0 for that partition? or am i misunderstanding?
>>
>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> code:
>>>   val query = spark.readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "somenode:9092")
>>> .

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
in case anyone else runs into this:

the issue is that i was using kafka-clients 0.10.1.1

it works when i use kafka-clients 0.10.0.1 with spark structured streaming

my kafka server is 0.10.1.1

On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i checked my topic. it has 5 partitions but all the data is written to a
> single partition: wikipedia-2
> i turned on debug logging and i see this:
>
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
> wikipedia-1]. Seeking to the end.
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
> partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-0 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-4 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-4
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
> successful heartbeat response for group spark-kafka-source-fac4f749-
> fd56-4a32-82c7-e687aadf520b-1923704552-driver-0
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-3 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-3
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=152908} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-2 to earliest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-2
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
> partition wikipedia-1 to latest offset.
> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
> offset=0} for partition wikipedia-1
> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
> wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> what is confusing to me is this:
> Resetting offset for partition wikipedia-2 to latest offset.
> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0,
> wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>
> why does it find latest offset 152908 for wikipedia-2 but then sets latest
> offset to 0 for that partition? or am i misunderstanding?
>
> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> code:
>>   val query = spark.readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "somenode:9092")
>> .option("subscribe", "wikipedia")
>> .load
>> .select(col("value") cast StringType)
>> .writeStream
>> .format("console")
>> .outputMode(OutputMode.Append)
>> .start()
>>
>>   while (true) {
>> Thread.sleep(1)
>>     println(query.lastProgress)
>>   }
>> }
>>
>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com
>> > wrote:
>>
>>> lets see the

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
i checked my topic. it has 5 partitions but all the data is written to a
single partition: wikipedia-2
i turned on debug logging and i see this:

2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
wikipedia-1]. Seeking to the end.
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-0
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-4
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-3
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-2
2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
partition wikipedia-1
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-0 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-0 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-4 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-4
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-4 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-4
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-3 to latest offset.
2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
successful heartbeat response for group
spark-kafka-source-fac4f749-fd56-4a32-82c7-e687aadf520b-1923704552-driver-0
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-3
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-3 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-3
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-2 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=152908} for partition wikipedia-2
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-2 to earliest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-2
2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for partition
wikipedia-1 to latest offset.
2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
offset=0} for partition wikipedia-1
2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
wikipedia-3 -> 0, wikipedia-0 -> 0)

what is confusing to me is this:
Resetting offset for partition wikipedia-2 to latest offset.
Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0,
wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)

why does it find latest offset 152908 for wikipedia-2 but then sets latest
offset to 0 for that partition? or am i misunderstanding?

On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote:

> code:
>   val query = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "somenode:9092")
> .option("subscribe", "wikipedia")
> .load
> .select(col("value") cast StringType)
> .writeStream
> .format("console")
> .outputMode(OutputMode.Append)
> .start()
>
>   while (true) {
> Thread.sleep(1)
> println(query.lastProgress)
>   }
> }
>
> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com>
> wrote:
>
>> lets see the code...
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>>
>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>:
>>
>>> my little program prints out query.lastProgress every 10 seconds, and
>>> this is what it shows:
>>>
>>> {
>>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>>   "name" : "wiki",
>

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
code:
  val query = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "somenode:9092")
.option("subscribe", "wikipedia")
.load
.select(col("value") cast StringType)
.writeStream
.format("console")
.outputMode(OutputMode.Append)
.start()

  while (true) {
Thread.sleep(1)
println(query.lastProgress)
  }
}

On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <alons...@gmail.com>
wrote:

> lets see the code...
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links>
>
> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>:
>
>> my little program prints out query.lastProgress every 10 seconds, and
>> this is what it shows:
>>
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:54:45.732Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "getOffset" : 9,
>> "triggerExecution" : 10
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaSource[Subscribe[wikipedia]]",
>> "startOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "endOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>> "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:54:55.745Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "getOffset" : 5,
>> "triggerExecution" : 5
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>> "description" : "KafkaSource[Subscribe[wikipedia]]",
>> "startOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "endOffset" : {
>>   "wikipedia" : {
>> "2" : 0,
>> "4" : 0,
>> "1" : 0,
>> "3" : 0,
>> "0" : 0
>>   }
>> },
>> "numInputRows" : 0,
>> "inputRowsPerSecond" : 0.0,
>> "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>> "description" : "org.apache.spark.sql.executio
>> n.streaming.ConsoleSink@4818d2d9"
>>   }
>> }
>> {
>>   "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
>>   "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
>>   "name" : "wiki",
>>   "timestamp" : "2017-01-26T22:55:05.748Z",
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>> "getOffset" : 5,
>> "triggerExecution" : 5
>>   },
>>   "stat

Re: Making withColumn nullable

2017-01-27 Thread Koert Kuipers
it should be by default nullable except for certain primitives where it
defaults to non-nullable

you can use Option for your return value to indicate nullability.

On Fri, Jan 27, 2017 at 10:32 AM, Ninad Shringarpure 
wrote:

> HI Team,
>
> When I add a column to my data frame using withColumn and assign some
> value, it automatically creates the schema with this column to be not
> nullable.
> My final Hive table schema where I want to insert it has this column to be
> nullable and hence throws an error when I try to save.
>
> Is there a way of making the column I add with withColumn method to be set
> to nullable?
>
> Thanks,
> Ninad
>


Re: kafka structured streaming source refuses to read

2017-01-26 Thread Koert Kuipers
   },
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}
{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:55:25.760Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 4,
"triggerExecution" : 4
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[wikipedia]]",
"startOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}
{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:55:35.766Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 4,
"triggerExecution" : 4
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[wikipedia]]",
"startOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}


On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com> wrote:

> hey,
> i am just getting started with kafka + spark structured streaming. so this
> is probably a pretty dumb mistake.
>
> i wrote a little program in spark to read messages from a kafka topic and
> display them in the console, using the kafka source and console sink. i run
> it it in spark local mode.
>
> i hooked it up to a test topic that i send messages to using the kafka
> console producer, and everything works great. i type a message in the
> console producer, and it pops up in my spark program. very neat!
>
> next i point it to another topic instead on which a kafka-connect program
> is writing lots of irc messages. i can see kafka connect to the topic
> successfully, the partitions are discovered etc., and then... nothing. it
> just keeps stuck at offsets 0 for all partitions. at the same time in
> another terminal i can see messages coming in just fine using the kafka
> console consumer.
>
> i dont get it. why doesnt kafka want to consume from this topic in spark
> structured streaming?
>
> thanks! koert
>
>
>


kafka structured streaming source refuses to read

2017-01-26 Thread Koert Kuipers
hey,
i am just getting started with kafka + spark structured streaming. so this
is probably a pretty dumb mistake.

i wrote a little program in spark to read messages from a kafka topic and
display them in the console, using the kafka source and console sink. i run
it it in spark local mode.

i hooked it up to a test topic that i send messages to using the kafka
console producer, and everything works great. i type a message in the
console producer, and it pops up in my spark program. very neat!

next i point it to another topic instead on which a kafka-connect program
is writing lots of irc messages. i can see kafka connect to the topic
successfully, the partitions are discovered etc., and then... nothing. it
just keeps stuck at offsets 0 for all partitions. at the same time in
another terminal i can see messages coming in just fine using the kafka
console consumer.

i dont get it. why doesnt kafka want to consume from this topic in spark
structured streaming?

thanks! koert


Re: can we plz open up encoder on dataset

2017-01-26 Thread Koert Kuipers
oh the map in DataFrame is actually using a RowEncoder. i left it out
because it wasn't important:

so this doesn't compile:

def f[T]: Dataset[T] => Dataset[T] = dataset => {
  val df = dataset.toDF
  df.map(row => row)(RowEncoder(df.schema)).as[T]
}


now this does compile. but i don't like it, since the assumption of an
implicit encoder isnt always true, and i dont want to start passing
encoders around when they are already embedded in the datasets:

def f[T: Encoder]: Dataset[T] => Dataset[T] = dataset => {
  val df = dataset.toDF
  df.map(row => row)(RowEncoder(df.schema)).as[T]
}

On Thu, Jan 26, 2017 at 10:50 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Koert,
>
> map will take the value that has an implicit Encoder to any value that
> may or may not have an encoder in scope. That's why I'm asking about
> the map function to see what it does.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Jan 26, 2017 at 4:18 PM, Koert Kuipers <ko...@tresata.com> wrote:
> > the map operation works on DataFrame so it doesn't need an encoder. It
> could
> > have been any operation on DataFrame. the issue is at the end going back
> to
> > Dataset[T] using as[T]. this requires an encoder for T which i know i
> > already have since i started with a Dataset[T].
> >
> > i could add an implicit encoder but that assumes T has an implicit
> encoder
> > which isn't always true. for example i could be using a kryo encoder. but
> > anyhow i shouldn't have to be guessing this Encoder[T] since it's part
> of my
> > dataset already
> >
> > On Jan 26, 2017 05:18, "Jacek Laskowski" <ja...@japila.pl> wrote:
> >
> > Hi,
> >
> > Can you show the code from map to reproduce the issue? You can create
> > encoders using Encoders object (I'm using it all over the place for
> schema
> > generation).
> >
> > Jacek
> >
> > On 25 Jan 2017 10:19 p.m., "Koert Kuipers" <ko...@tresata.com> wrote:
> >>
> >> i often run into problems like this:
> >>
> >> i need to write a Dataset[T] => Dataset[T], and inside i need to switch
> to
> >> DataFrame for a particular operation.
> >>
> >> but if i do:
> >> dataset.toDF.map(...).as[T] i get error:
> >> Unable to find encoder for type stored in a Dataset.
> >>
> >> i know it has an encoder, because i started with Dataset[T]
> >>
> >> so i would like to do:
> >> dataset.toDF.map(...).as[T](dataset.encoder)
> >>
> >
>


  1   2   3   4   5   >