Re: [spark.local.dir] comma separated list does not work

2024-01-12 Thread Koert Kuipers
try it without spaces? export SPARK_LOCAL_DIRS="/tmp,/share/" On Fri, Jan 12, 2024 at 5:00 PM Andrew Petersen wrote: > Hello Spark community > > SPARK_LOCAL_DIRS or > spark.local.dir > is supposed to accept a list. > > I want to list one local (fast) drive, followed by a gpfs network drive,

Re: Does Spark support role-based authentication and access to Amazon S3? (Kubernetes cluster deployment)

2023-12-13 Thread Koert Kuipers
yes it does using IAM roles for service accounts. see: https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html i wrote a little bit about this also here: https://technotes.tresata.com/spark-on-k8s/ On Wed, Dec 13, 2023 at 7:52 AM Atul Patil wrote: > Hello Team, > >

Re: Elasticsearch support for Spark 3.x

2023-09-01 Thread Koert Kuipers
could the provided scope be the issue? On Sun, Aug 27, 2023 at 2:58 PM Dipayan Dev wrote: > Using the following dependency for Spark 3 in POM file (My Scala version > is 2.12.14) > > > > > > > *org.elasticsearch > elasticsearch-spark-30_2.12 > 7.12.0provided* > > > The code throws error

Re: [EXTERNAL] spark re-use shuffle files not happening

2022-07-16 Thread Koert Kuipers
ion), not cross jobs. > -- > *From:* Koert Kuipers > *Sent:* Saturday, July 16, 2022 6:43 PM > *To:* user > *Subject:* [EXTERNAL] spark re-use shuffle files not happening > > > *ATTENTION:* This email originated from outside of GM. > > >

spark re-use shuffle files not happening

2022-07-16 Thread Koert Kuipers
i have seen many jobs where spark re-uses shuffle files (and skips a stage of a job), which is an awesome feature given how expensive shuffles are, and i generally now assume this will happen. however i feel like i am going a little crazy today. i did the simplest test in spark 3.3.0, basically i

Re: When should we cache / persist ? After or Before Actions?

2022-04-27 Thread Koert Kuipers
we have quite a few persists statements in our codebase whenever we are reusing a dataframe. we noticed that it slows things down quite a bit (sometimes doubles the runtime), while providing little benefits, since spark already re-uses the shuffle files underlying the dataframe efficiently even if

Re: Current state of dataset api

2021-10-05 Thread Koert Kuipers
the encoder api remains a pain point due to its lack of composability. serialization overhead is also still there i believe. i dont remember what has happened to the predicate pushdown issues, i think they are mostly resolved? we tend to use dataset api on our methods/interfaces where its fitting

understanding spark shuffle file re-use better

2021-01-13 Thread Koert Kuipers
is shuffle file re-use based on identity or equality of the dataframe? for example if run the exact same code twice to load data and do transforms (joins, aggregations, etc.) but without re-using any actual dataframes, will i still see skipped stages thanks to shuffle file re-use? thanks! koert

Re: Apache Spark 3.1 Preparation Status (Oct. 2020)

2020-10-07 Thread Koert Kuipers
it seems to me with SPARK-20202 we are no longer planning to support hadoop2 + hive 1.2. is that correct? so basically spark 3.1 will no longer run on say CDH 5.x or HDP2.x with hive? my use case is building spark 3.1 and launching on these existing clusters that are not managed by me. e.g. i do

Re: Spark Small file issue

2020-06-24 Thread Koert Kuipers
i second that. we have gotten bitten too many times by coalesce impacting upstream in an unintended way that i avoid coalesce on write altogether. i prefer to use repartition (and take the shuffle hit) before writing (especially if you are writing out partitioned), or if possible use adaptive

Re: Scala version compatibility

2020-04-07 Thread Koert Kuipers
i think it will work then assuming the callsite hasnt changed between scala versions On Mon, Apr 6, 2020 at 5:09 PM Andrew Melo wrote: > Hello, > > On Mon, Apr 6, 2020 at 3:31 PM Koert Kuipers wrote: > >> actually i might be wrong about this. did you declare scala t

Re: Scala version compatibility

2020-04-06 Thread Koert Kuipers
actually i might be wrong about this. did you declare scala to be a provided dependency? so scala is not in your fat/uber jar? if so then maybe it will work. On Mon, Apr 6, 2020 at 4:16 PM Andrew Melo wrote: > > > On Mon, Apr 6, 2020 at 3:08 PM Koert Kuipers wrote:

Re: Scala version compatibility

2020-04-06 Thread Koert Kuipers
yes it will On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo wrote: > Hello all, > > I'm aware that Scala is not binary compatible between revisions. I have > some Java code whose only Scala dependency is the transitive dependency > through Spark. This code calls a Spark API which returns a Seq,

find failed test

2020-03-06 Thread Koert Kuipers
i just ran: mvn test -fae > log.txt at the end of log.txt i find it says there are failures: [INFO] Spark Project SQL .. FAILURE [47:55 min] that is not very helpful. what tests failed? i could go scroll up but the file has 21,517 lines. ok let's skip that. so i

conflict with multiple jobs writing to different partitions but same baseDir

2019-05-25 Thread Koert Kuipers
lets say i have 2 dataframe jobs that write to /somedir/a=1 and somedir/a=2. these can run at same time without issues. but now i get excited about dynamic partitioning. so i add "a" as a column to my 2 dataframes, set the option partitionOverwriteMode=dynamic, add partitionBy("a": _*) to the

Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
ersion and that worked instantly > after unzipping. > > Right now I am stuck on connecting to yarn. > > > On Mon, May 20, 2019 at 02:50:44PM -0400, Koert Kuipers wrote: > > we had very little issues with hdfs or hive, but then we use hive only > for > > basic reading and writi

Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
ly doable for me. My guess is I will have some > troubles to make that spark version work with both hive & hdfs installed > on the cluster - or maybe that's finally plug-&-play i don't know. > > thanks > > On Mon, May 20, 2019 at 02:16:43PM -0400, Koert Kuipers wrote: > >

Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
nd spark version (2.4) on the cluster ? > > thanks > > On Mon, May 20, 2019 at 01:58:11PM -0400, Koert Kuipers wrote: > > yarn can happily run multiple spark versions side-by-side > > you will need the spark version you intend to launch with on the machine > you >

Re: run new spark version on old spark cluster ?

2019-05-20 Thread Koert Kuipers
yarn can happily run multiple spark versions side-by-side you will need the spark version you intend to launch with on the machine you launch from and point to the correct spark-submit On Mon, May 20, 2019 at 1:50 PM Nicolas Paris wrote: > Hi > > I am wondering whether that's feasible to: > -

Re: ml Pipeline read write

2019-05-10 Thread Koert Kuipers
Koert Kuipers wrote: > i am trying to understand how ml persists pipelines. it seems a > SparkSession or SparkContext is needed for this, to write to hdfs. > > MLWriter and MLReader both extend BaseReadWrite to have access to a > SparkSession. but this is where it gets confusing.

ml Pipeline read write

2019-05-10 Thread Koert Kuipers
i am trying to understand how ml persists pipelines. it seems a SparkSession or SparkContext is needed for this, to write to hdfs. MLWriter and MLReader both extend BaseReadWrite to have access to a SparkSession. but this is where it gets confusing... the only way to set the SparkSession seems to

spark 2.4.1 -> 3.0.0-SNAPSHOT mllib

2019-04-23 Thread Koert Kuipers
we recently started compiling against spark 3.0.0-SNAPSHOT (build inhouse from master branch) to uncover any breaking changes that might be an issue for us. we ran into some of our tests breaking where we use mllib. most of it is immaterial: we had some magic numbers hard-coded and the results

Re: ClassCastException for SerializedLamba

2019-03-30 Thread Koert Kuipers
i found jira that seems related: https://issues.apache.org/jira/browse/SPARK-25047 On Fri, Mar 29, 2019 at 4:01 PM Koert Kuipers wrote: > hi all, > we are switching from scala 2.11 to 2.12 with a spark 2.4.1 release > candidate and so far this has been going pretty smoothly. > >

ClassCastException for SerializedLamba

2019-03-29 Thread Koert Kuipers
hi all, we are switching from scala 2.11 to 2.12 with a spark 2.4.1 release candidate and so far this has been going pretty smoothly. however we do see some new serialization errors related to Function1, Function2, etc. they look like this: ClassCastException: cannot assign instance of

Re: Difference between dataset and dataframe

2019-02-19 Thread Koert Kuipers
t; step4.collect() > > > > step4._jdf.queryExecution().debug().codegen() > > > > You will see the generated code. > > > > Regards, > > Dhaval > > > > *From:* [External] Akhilanand > *Sent:* Tuesday, February 19, 2019 10:29 AM > *To:* Ko

Re: Difference between dataset and dataframe

2019-02-18 Thread Koert Kuipers
in the api DataFrame is just Dataset[Row]. so this makes you think Dataset is the generic api. interestingly enough under the hood everything is really Dataset[Row], so DataFrame is really the "native" language for spark sql, not Dataset. i find DataFrame to be significantly more performant. in

Re: Caused by: java.io.NotSerializableException: com.softwaremill.sttp.FollowRedirectsBackend

2018-11-29 Thread Koert Kuipers
if you only use it in the executors sometimes using lazy works On Thu, Nov 29, 2018 at 9:45 AM James Starks wrote: > This is not problem directly caused by Spark, but it's related; thus > asking here. I use spark to read data from parquet and processing some http > call with sttp

structured streaming bookkeeping formats

2018-10-27 Thread Koert Kuipers
i was reading this blog post from last year about structured streaming run-once trigger: https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html its a nice idea to replace a batch job with structured streaming because it does the bookkeeping (whats new, failure

Re: Why repartitionAndSortWithinPartitions slower than MapReducer

2018-08-20 Thread Koert Kuipers
I assume you are using RDDs? What are you doing after the repartitioning + sorting, if anything? On Aug 20, 2018 11:22, "周浥尘" wrote: In addition to my previous email, Environment: spark 2.1.2, hadoop 2.6.0-cdh5.11, Java 1.8, CentOS 6.6 周浥尘 于2018年8月20日周一 下午8:52写道: > Hi team, > > I found the

something happened to MemoryStream after spark 2.3

2018-08-16 Thread Koert Kuipers
hi, we just started testing internally with spark 2.4 snapshots, and it seems our streaming tests are broken. i believe it has to do with MemoryStream. before we were able to create a MemoryStream, add data to it, convert it to a streaming unbounded DataFrame and use it repeatedly. by using it

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
s number of partitions), we should apply coalesce to > the operator and the number of partitions are still kept unchanged whereas > it incurs less parallelism and also less tasks. > > We just can't apply coalesce to individual operator in narrow dependency. > > -Jungtaek Lim

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-09 Thread Koert Kuipers
, and then for every groupByKey (or any other shuffle operation) follow it up with a coalesce to set the number of partitions. its like i have numPartitions back from those good old RDD shuffle methods :) On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers wrote: > an new map task after a shuffle is also a nar

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
R) > > 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e > 9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/ > Dataset.scala#L2918-L2937 > > > 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers 님이 작성: > >> sorry i meant to say: >> wit a checkpoint i get

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
sorry i meant to say: wit a checkpoint i get a map phase with lots of tasks to read the data, then a reduce phase with 2048 reducers, and then finally a map phase with 100 tasks. On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers wrote: > the only thing that seems to stop this so far is a checkpo

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
could insert something like a dummy operation that logical steps cannot jump over. On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers wrote: > ok thanks. > > mh. that seems odd. shouldnt coalesce introduce a new map-phase with > less tasks instead of changing the previous shuffl

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
:13 PM, Vadim Semenov wrote: > `coalesce` sets the number of partitions for the last stage, so you > have to use `repartition` instead which is going to introduce an extra > shuffle stage > > On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers wrote: > > > > one small corr

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
one small correction: lots of files leads to pressure on the spark driver program when reading this data in spark. On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers wrote: > hi, > > i am reading data from files into a dataframe, then doing a groupBy for a > given column with a count, a

groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Koert Kuipers
hi, i am reading data from files into a dataframe, then doing a groupBy for a given column with a count, and finally i coalesce to a smaller number of partitions before writing out to disk. so roughly:

spark structured streaming with file based sources and sinks

2018-08-06 Thread Koert Kuipers
has anyone used spark structured streaming from/to files (json, csv, parquet, avro) in a non-test setting? i realize kafka is probably the way to go, but lets say i have a situation where kafka is not available for reasons out of my control, and i want to do micro-batching. could i use files to

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Koert Kuipers
this works for dataframes with spark 2.3 by changing a global setting, and will be configurable per write in 2.4 see: https://issues.apache.org/jira/browse/SPARK-20236 https://issues.apache.org/jira/browse/SPARK-24860 On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel wrote: > Hi Peay, > > Have you

Re: Dataframe vs Dataset dilemma: either Row parsing or no filter push-down

2018-06-18 Thread Koert Kuipers
we use DataFrame and RDD. Dataset not only has issues with predicate pushdown, it also adds shufffles at times where it shouldn't. and there is some overhead from the encoders themselves, because under the hood it is still just Row objects. On Mon, Jun 18, 2018 at 5:00 PM, Valery Khamenya

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-30 Thread Koert Kuipers
mestamp column was the > timestamp of message 2. So, current time was the timestamp of message 2. > When you posted 3, the time advanced to the timestamp of 3, which caused 1 > to fall out, so it output 1. > > > > Note that, it will not output 1 exactly 1 second after 1 arrives

Re: trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-29 Thread Koert Kuipers
to correct this behavior? On Mon, May 28, 2018 at 6:16 PM, Koert Kuipers wrote: > hello all, > just playing with structured streaming aggregations for the first time. > this is my little program i run inside sbt: > > import org.apache.spark.sql.functions._ > > val lin

trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-28 Thread Koert Kuipers
hello all, just playing with structured streaming aggregations for the first time. this is my little program i run inside sbt: import org.apache.spark.sql.functions._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", )

Re: Scala's Seq:* equivalent in java

2018-05-15 Thread Koert Kuipers
Isn't _* varargs? So you should be able to use Java array? On Tue, May 15, 2018, 06:29 onmstester onmstester wrote: > I could not find how to pass a list to isin() filter in java, something > like this could be done with scala: > > val ids = Array(1,2) >

Re: Structured Streaming, Reading and Updating a variable

2018-05-15 Thread Koert Kuipers
You use a windowed aggregation for this On Tue, May 15, 2018, 09:23 Martin Engen wrote: > Hello, > > > > I'm working with Structured Streaming, and I need a method of keeping a > running average based on last 24hours of data. > > To help with this, I can use

Spark structured streaming aggregation within microbatch

2018-05-15 Thread Koert Kuipers
I have a streaming dataframe where I insert a uuid in every row, then join with a static dataframe (after which uuid column is no longer unique), then group by uuid and do a simple aggregation. So I know all rows with same uuid will be in same micro batch, guaranteed, correct? How do I express it

Re: Guava dependency issue

2018-05-08 Thread Koert Kuipers
we shade guava in our fat jar/assembly jar/application jar On Tue, May 8, 2018 at 12:31 PM, Marcelo Vanzin wrote: > Using a custom Guava version with Spark is not that simple. Spark > shades Guava, but a lot of libraries Spark uses do not - the main one > being all of the

Re: all spark settings end up being system properties

2018-03-30 Thread Koert Kuipers
is something that user code instantiates, so we can't easily > make it read data from arbitrary locations. You could use thread > locals and other tricks, but user code can always break those. > > Where: this is done by the SparkSubmit class (look for the Scala > version, "sys.props&

all spark settings end up being system properties

2018-03-30 Thread Koert Kuipers
does anyone know why all spark settings end up being system properties, and where this is done? for example when i pass "--conf spark.foo=bar" into spark-submit then System.getProperty("spark.foo") will be equal to "bar" i grepped the spark codebase for System.setProperty or System.setProperties

change spark default for a setting without overriding user

2018-03-16 Thread Koert Kuipers
i would like to change some defaults in spark without overriding the user if she/he wishes to change them. for example currently spark.blacklist.enabled is by default false, which makes sense for backwards compatibility. i would like it to be by default true, but if the user provided --conf

Re: Does Spark 2.2.0 support Dataset<List<Map<String,Object>>> ?

2017-10-09 Thread Koert Kuipers
<Seq<Map<String, X>>> what would be the > Enconding?is it Encoding.kryo(Seq.class) ? > > Also shouldn't List be supported? Should I create a ticket for this? > > > On Mon, Oct 9, 2017 at 6:10 AM, Koert Kuipers <ko...@tresata.com> wrote: > >> it sup

Re: Does Spark 2.2.0 support Dataset<List<Map<String,Object>>> ?

2017-10-09 Thread Koert Kuipers
it supports Dataset>> where X must be a supported type also. Object is not a supported type. On Mon, Oct 9, 2017 at 7:36 AM, kant kodali wrote: > Hi All, > > I am wondering if spark supports Dataset>> ? > > when I do the following

Re: Apache Spark - MLLib challenges

2017-09-23 Thread Koert Kuipers
our main challenge has been the lack of support for missing values generally On Sat, Sep 23, 2017 at 3:41 AM, Irfan Kabli wrote: > Dear All, > > We are looking to position MLLib in our organisation for machine learning > tasks and are keen to understand if their are

Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-24 Thread Koert Kuipers
Dataset/DataFrame has repartition (which can be used to partition by key) and sortWithinPartitions. see for example usage here: https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala#L18 On Fri, Jun 23, 2017 at 5:43 PM, Keith

Re: isin query

2017-04-17 Thread Koert Kuipers
i dont see this behavior in the current spark master: scala> val df = Seq("m_123", "m_111", "m_145", "m_098", "m_666").toDF("msrid") df: org.apache.spark.sql.DataFrame = [msrid: string] scala> df.filter($"msrid".isin("m_123")).count res0: Long = 1 scala>

NPE in UDF yet no nulls in data because analyzer runs test with nulls

2017-04-14 Thread Koert Kuipers
we were running in to an NPE in one of our UDFs for spark sql. now this particular function indeed could not handle nulls, but this was by design since null input was never allowed (and we would want it to blow up if there was a null as input). we realized the issue was not in our data when we

Re: Why dataframe can be more efficient than dataset?

2017-04-09 Thread Koert Kuipers
a> df.filter("age < 20") > res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: > string, age: bigint] > > scala> ds.filter("age < 20") > res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint] > > > > > >

Re: Why dataframe can be more efficient than dataset?

2017-04-08 Thread Koert Kuipers
how would you use only relational transformations on dataset? On Sat, Apr 8, 2017 at 2:15 PM, Shiyuan wrote: > Hi Spark-users, > I came across a few sources which mentioned DataFrame can be more > efficient than Dataset. I can understand this is true because Dataset >

Re: Why dataframe can be more efficient than dataset?

2017-04-08 Thread Koert Kuipers
let me try that again. i left some crap at the bottom of my previous email as i was editing it. sorry about that. here it goes: it is because you use Dataset[X] but the actual computations are still done in Dataset[Row] (so DataFrame). well... the actual computations are done in RDD[InternalRow]

Re: Why dataframe can be more efficient than dataset?

2017-04-08 Thread Koert Kuipers
it is because you use Dataset[X] but the actual computations are still done in Dataset[Row] (so DataFrame). well... the actual computations are done in RDD[InternalRow] with spark's internal types to represent String, Map, Seq, structs, etc. so for example if you do: scala> val x:

Re: how do i force unit test to do whole stage codegen

2017-04-05 Thread Koert Kuipers
gt;> The page in the URL explains the old style of physical plan output. >> The current style adds "*" as a prefix of each operation that the >> whole-stage codegen can be apply to. >> >> So, in your test case, whole-stage codegen has been already enabled!! >> >&g

Re: how do i force unit test to do whole stage codegen

2017-04-04 Thread Koert Kuipers
e-stage codegen can be apply to. > > So, in your test case, whole-stage codegen has been already enabled!! > > FYI. I think that it is a good topic for d...@spark.apache.org. > > Kazuaki Ishizaki > > > > From:Koert Kuipers <ko...@tresata.com> > To:

how do i force unit test to do whole stage codegen

2017-04-04 Thread Koert Kuipers
i wrote my own expression with eval and doGenCode, but doGenCode never gets called in tests. also as a test i ran this in a unit test: spark.range(10).select('id as 'asId).where('id === 4).explain according to

map transform on array in spark sql

2017-04-03 Thread Koert Kuipers
i have a DataFrame where one column has type: ArrayType(StructType(Seq( StructField("a", typeA, nullableA), StructField("b", typeB, nullableB) ))) i would like to map over this array to pick the first element in the struct. so the result should be a ArrayType(typeA, nullableA). i realize i

Re: Why VectorUDT private?

2017-03-30 Thread Koert Kuipers
distro on the cluster. On Thu, Mar 30, 2017 at 1:34 PM, Koert Kuipers <ko...@tresata.com> wrote: > i agree with that. > > we work within that assumption. we compile and run against a single exact > spark version. we know when we upgrade that we might run into minor > inconvenience

Re: Why VectorUDT private?

2017-03-30 Thread Koert Kuipers
nable. > > On Thu, Mar 30, 2017 at 5:52 AM, Koert Kuipers <ko...@tresata.com> wrote: > >> I stopped asking long time ago why things are private in spark... I >> mean... The conversion between ml and mllib vectors is private... the >> conversion between spark vector and

Re: Why VectorUDT private?

2017-03-30 Thread Koert Kuipers
I stopped asking long time ago why things are private in spark... I mean... The conversion between ml and mllib vectors is private... the conversion between spark vector and breeze used to be (or still is?) private. it just goes on. Lots of useful stuff is private[SQL]. Luckily there are simple

Re: Easily creating custom encoders

2017-03-21 Thread Koert Kuipers
see: https://issues.apache.org/jira/browse/SPARK-18122 On Tue, Mar 21, 2017 at 1:13 PM, Ashic Mahtab wrote: > I'm trying to easily create custom encoders for case classes having > "unfriendly" fields. I could just kryo the whole thing, but would like to > at least have a few

Re: Fast write datastore...

2017-03-15 Thread Koert Kuipers
we are using elasticsearch for this. the issue of elasticsearch falling over if the number of partitions/cores in spark writing to it is too high does suck indeed. and the answer every time i asked about it on elasticsearch mailing list has been to reduce spark tasks or increase elasticsearch

Re: can spark take advantage of ordered data?

2017-03-10 Thread Koert Kuipers
this shouldn't be too hard. adding something to spark-sorted or to the dataframe/dataset logical plan that says "trust me, i am already partitioned and sorted" seems doable. however you most likely need a custom hash partitioner, and you have to be careful to read the data in without file

Re: finding Spark Master

2017-03-07 Thread Koert Kuipers
assuming this is running on yarn there is really spark-master. every job created its own "master" within a yarn application. On Tue, Mar 7, 2017 at 6:27 PM, Adaryl Wakefield < adaryl.wakefi...@hotmail.com> wrote: > I’m running a three node cluster along with Spark along with Hadoop as > part of

Re: Spark join over sorted columns of dataset.

2017-03-03 Thread Koert Kuipers
For RDD the shuffle is already skipped but the sort is not. In spark-sorted we track partitioning and sorting within partitions for key-value RDDs and can avoid the sort. See: https://github.com/tresata/spark-sorted For Dataset/DataFrame such optimizations are done automatically, however it's

Re: Spark runs out of memory with small file

2017-02-26 Thread Koert Kuipers
using wholeFiles to process formats that can not be split per line is not "old" and there are plenty of problems for which RDD is still better suited than Dataset or DataFrame currently (this might change in near future when Dataset gets some crucial optimizations fixed). On Sun, Feb 26, 2017 at

Re: Turning rows into columns

2017-02-05 Thread Koert Kuipers
since there is no key to group by and assemble records i would suggest to write this in RDD land and then convert to data frame. you can use sc.wholeTextFiles to process text files and create a state machine On Feb 4, 2017 16:25, "Paul Tremblay" wrote: I am using

Re: Spark 2 + Java + UDF + unknown return type...

2017-02-05 Thread Koert Kuipers
A UDF that does not return a single type is not supported. and spark has no concept of union types. On Feb 2, 2017 16:05, "Jean Georges Perrin" wrote: Hi fellow Sparkans, I am building a UDF (in Java) that can return various data types, basically the signature of the function

Re: How to checkpoint and RDD after a stage and before reaching an action?

2017-02-04 Thread Koert Kuipers
this is a general problem with checkpoint, one of the least understood operations i think. checkpoint is lazy (meaning it doesnt start until there is an action) and asynchronous (meaning when it does start it is its own computation). so basically with a checkpoint the rdd always gets computed

dataset algos slow because of too many shuffles

2017-02-02 Thread Koert Kuipers
we noticed that some algos we ported from rdd to dataset are significantly slower, and the main reason seems to be more shuffles that we successfully avoid for rdds by careful partitioning. this seems to be dataset specific as it works ok for dataframe. see also here:

Re: frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
great its an easy fix. i will create jira and pullreq On Thu, Feb 2, 2017 at 2:13 PM, Michael Armbrust <mich...@databricks.com> wrote: > That might be reasonable. At least I can't think of any problems with > doing that. > > On Thu, Feb 2, 2017 at 7:39 AM, Koert Kuipers

Re: frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
another example is if i have a Dataset[(K, V)] and i want to repartition it by the key K. repartition requires a Column which means i am suddenly back to worrying about duplicate field names. i would like to be able to say: dataset.repartition(dataset(0)) On Thu, Feb 2, 2017 at 10:39 AM, Koert

frustration with field names in Dataset

2017-02-02 Thread Koert Kuipers
since a dataset is a typed object you ideally don't have to think about field names. however there are operations on Dataset that require you to provide a Column, like for example joinWith (and joinWith returns a strongly typed Dataset, not DataFrame). once you have to provide a Column you are

Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Koert Kuipers
> Hi Koert, > > Thanks for the tips. I tried to do that but the column's type is now > Binary. Do I get the Set[X] back in the Dataset? > > Best Regards, > > Jerry > > > On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers <ko...@tresata.com> wrote

union of compatible types

2017-02-01 Thread Koert Kuipers
spark's onion/merging of compatible types seems kind of weak. it works on basic types in the top level record, but it fails for nested records, maps, arrays, etc. are there any known workarounds or plans to improve this? for example i get errors like this: org.apache.spark.sql.AnalysisException:

Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-01-31 Thread Koert Kuipers
set is currently not supported. you can use kryo encoder. there is no other work around that i know of. import org.apache.spark.sql.{ Encoder, Encoders } implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]] On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam wrote:

Re: eager? in dataframe's checkpoint

2017-01-31 Thread Koert Kuipers
er. > You basically start fresh. AFAIK, when eager is true, the method will not > return until the DataFrame is completely checkpointed. If you look at the > RDD.checkpoint implementation, the checkpoint location is updated > synchronously therefore during the count, `isCheckpointed` will

Re: eager? in dataframe's checkpoint

2017-01-31 Thread Koert Kuipers
i understand that checkpoint cuts the lineage, but i am not fully sure i understand the role of eager. eager simply seems to materialize the rdd early with a count, right after the rdd has been checkpointed. but why is that useful? rdd.checkpoint is asynchronous, so when the rdd.count happens

Re: Spark 2.1.0 and Shapeless

2017-01-31 Thread Koert Kuipers
shading at the fat jar level can work, however it means that in your unit tests where spark is a provided dependency you still can get errors because spark is using an incompatible (newer) shapeless version. the unit tests run with a single resolved shapeless after all. for example spark ships

Re: Jars directory in Spark 2.0

2017-01-31 Thread Koert Kuipers
you basically have to keep your versions of dependencies in line with sparks or shade your own dependencies. you cannot just replace the jars in sparks jars folder. if you wan to update them you have to build spark yourself with updated dependencies and confirm it compiles, passes tests etc. On

Re: userClassPathFirst=true prevents SparkContext to be initialized

2017-01-30 Thread Koert Kuipers
i dont know why this is happening but i have given up on userClassPath=first. i have seen many weird errors with it and consider it broken. On Jan 30, 2017 05:24, "Roberto Coluccio" wrote: Hello folks, I'm trying to work around an issue with some dependencies by

Re: Aggregator mutate b1 in place in merge

2017-01-29 Thread Koert Kuipers
> reduce and merge. This approach worked and it was approved by people who > know the context. > > Hope that helps. > > 2017-01-29 17:17 GMT+01:00 Koert Kuipers <ko...@tresata.com>: > >> anyone? >> it not i will follow the trail and try to deduce it myself

Re: Aggregator mutate b1 in place in merge

2017-01-29 Thread Koert Kuipers
anyone? it not i will follow the trail and try to deduce it myself On Mon, Jan 23, 2017 at 2:31 PM, Koert Kuipers <ko...@tresata.com> wrote: > looking at the docs for org.apache.spark.sql.expressions.Aggregator it > says for reduce method: "For performance, the funct

Re: kafka structured streaming source refuses to read

2017-01-28 Thread Koert Kuipers
there was also already an existing spark ticket for this: SPARK-18779 <https://issues.apache.org/jira/browse/SPARK-18779> On Sat, Jan 28, 2017 at 1:13 PM, Koert Kuipers <ko...@tresata.com> wrote: > it seems the bug is: > https://issues.apache.org/jira/browse/KAFKA-4547

Re: kafka structured streaming source refuses to read

2017-01-28 Thread Koert Kuipers
it seems the bug is: https://issues.apache.org/jira/browse/KAFKA-4547 i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or 0.10.1.1 On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <ko...@tresata.com> wrote: > in case anyone else runs into this: > > the issue is t

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
in case anyone else runs into this: the issue is that i was using kafka-clients 0.10.1.1 it works when i use kafka-clients 0.10.0.1 with spark structured streaming my kafka server is 0.10.1.1 On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> wrote: > i checked

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
> 0, wikipedia-3 -> 0, wikipedia-0 -> 0) why does it find latest offset 152908 for wikipedia-2 but then sets latest offset to 0 for that partition? or am i misunderstanding? On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote: > code: > val query =

Re: kafka structured streaming source refuses to read

2017-01-27 Thread Koert Kuipers
> lets see the code... > > Alonso Isidoro Roman > [image: https://]about.me/alonso.isidoro.roman > > <https://about.me/alonso.isidoro.roman?promo=email_sig_source=email_sig_medium=email_sig_campaign=external_links> > > 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tres

Re: Making withColumn nullable

2017-01-27 Thread Koert Kuipers
it should be by default nullable except for certain primitives where it defaults to non-nullable you can use Option for your return value to indicate nullability. On Fri, Jan 27, 2017 at 10:32 AM, Ninad Shringarpure wrote: > HI Team, > > When I add a column to my data frame

Re: kafka structured streaming source refuses to read

2017-01-26 Thread Koert Kuipers
-7cbac0432099", "name" : "wiki", "timestamp" : "2017-01-26T22:55:25.760Z", "numInputRows" : 0, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 4, "t

kafka structured streaming source refuses to read

2017-01-26 Thread Koert Kuipers
hey, i am just getting started with kafka + spark structured streaming. so this is probably a pretty dumb mistake. i wrote a little program in spark to read messages from a kafka topic and display them in the console, using the kafka source and console sink. i run it it in spark local mode. i

Re: can we plz open up encoder on dataset

2017-01-26 Thread Koert Kuipers
e what it does. > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Thu, Jan 26, 2017 at 4:18 PM, Koert Kuipers <ko...@tr

  1   2   3   4   5   >