Re: [EXTERNAL] RDD.pipe() for binary data

2022-07-16 Thread Sebastian Piu
Other alternatives are to look at how PythonRDD does it in spark, you could also try to go for a more traditional setup where you expose your python functions behind a local/remote service and call that from scala - say over thrift/grpc/http/local socket etc. Another option, but I've never done it

Re: why the pyspark RDD API is so slow?

2022-01-31 Thread Sebastian Piu
When you operate on a dataframe from the python side you are just invoking methods in the JVM via a proxy (py4j) so it is almost as coding in java itself. This is as long as you don't define any udf's or any other code that needs to invoke python for processing Check the High Performance Spark

Re: A Persisted Spark DataFrame is computed twice

2022-01-31 Thread Sebastian Piu
Can you share the stages as seen in the spark ui for the count and coalesce jobs My suggestion of moving things around was just for troubleshooting rather than a solution of that wasn't clear before On Mon, 31 Jan 2022, 08:07 Benjamin Du, wrote: > Remvoing coalesce didn't help either. > > > >

Re: why the pyspark RDD API is so slow?

2022-01-30 Thread Sebastian Piu
It's because all data needs to be pickled back and forth between java and a spun python worker, so there is additional overhead than if you stay fully in scala. Your python code might make this worse too, for example if not yielding from operations You can look at using UDFs and arrow or trying

Re: A Persisted Spark DataFrame is computed twice

2022-01-30 Thread Sebastian Piu
It's probably the repartitioning and deserialising the df that you are seeing take time. Try doing this 1. Add another count after your current one and compare times 2. Move coalesce before persist You should see On Sun, 30 Jan 2022, 08:37 Benjamin Du, wrote: > I have some PySpark code like

Re: JDBC sessionInitStatement for writes?

2021-11-26 Thread Sebastian Piu
For datasources it's just something that is run on the connection before you statement is executed, it doesn't seem to depend on the specific jdbc driver. See here

Re: [spark executor error] container running from bad node-> exit code 134

2021-11-19 Thread Sebastian Piu
That error could mean different things, most of the time is that the JVM crashed . If you are running yarn check the yarn logs or the stderr of your spark job to see if there is any more details of the cause On Fri, 19 Nov 2021 at 15:25, Joris Billen wrote: > Hi, > we are seeing this error: > >

Re: Create Dataframe from a single String in Java

2021-11-18 Thread Sebastian Piu
You can call that on sparkSession to On Thu, 18 Nov 2021, 10:48 , wrote: > PS: The following works, but it seems rather awkward having to use the > SQLContext here. > > SQLContext sqlContext = new SQLContext(sparkContext); > > Dataset data = sqlContext > .createDataset(textList,

Re: Create Dataframe from a single String in Java

2021-11-18 Thread Sebastian Piu
The most convenient way I'm aware of from Java is to use createDataset and pass Encoder.String That gives you a Dataset if you still want Dataset the you can call .toDF on it On Thu, 18 Nov 2021, 10:27 , wrote: > Hello, > > I am struggling with a task that should be super simple: I would like

Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Sebastian Piu
If you want them to survive across jobs you can use snowflake IDs or similar ideas depending on your use case On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, wrote: > Meaning as a monolithically incrementing ID as in Oracle sequence for each > record read from Kafka. adding that to your

Re: [Spark Structured Streaming] retry/replay failed messages

2021-07-09 Thread Sebastian Piu
So in payment systems you have something similar I think You have an authorisation, then the actual transaction and maybe a refund some time in the future. You want to proceed with a transaction only if you've seen the auth but in an eventually consistent system this might not always happen. You

Re: Insert into table with one the value is derived from DB function using spark

2021-06-19 Thread Sebastian Piu
Another option is to just use plain jdbc (if in java) in a foreachPartition call on the dataframe/dataset then you get full control of the insert statement but need to open the connection/transaction yourself On Sat, 19 Jun 2021 at 19:33, Mich Talebzadeh wrote: > Hi, > > I did some research on

Re: Spark query performance of cached data affected by RDD lineage

2021-05-24 Thread Sebastian Piu
> Do Spark SQL queries depend directly on the RDD lineage even when the final results have been cached? Yes, if one of the nodes holding cached data later fails spark would need to rebuild that state somehow. You could try checkpointing occasionally and see if that helps On Sat, 22 May 2021,

Re: Does dataframe spark API write/create a single file instead of directory as a result of write operation.

2020-02-22 Thread Sebastian Piu
I'm not aware of a way to specify the file name on the writer. Since you'd need to bring all the data into a single node and write from there to get a single file out you could simple move/rename the file that spark creates or write the csv yourself with your library of preference? On Sat, 22 Feb

Re: Looking for a developer to help us with a small ETL project using Spark and Kubernetes

2019-07-19 Thread Sebastian Piu
Hey Warren, I've done similar integrations in the past, are you looking for a freelance dev to achieve this? I'm based in the UK. Cheers Seb On Thu, 18 Jul 2019, 11:47 pm Information Technologies, < i...@digitalearthnetwork.com> wrote: > Hello, > > We are looking for a developer to help us

Re: Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Sebastian Piu
You could set the env var SPARK_PRINT_LAUNCH_COMMAND and spark-submit will print it, but it will be printed by the subprocess and not yours unless you redirect the stdout Also the command is what spark-submit generates, so it is quite more verbose and includes the classpath etc. I think the

Re: Multiple transformations without recalculating or caching

2017-11-17 Thread Sebastian Piu
If you don't want to recalculate you need to hold the results somewhere, of you need to save it why don't you so that and then read it again and get your stats? On Fri, 17 Nov 2017, 10:03 Fernando Pereira, wrote: > Dear Spark users > > Is it possible to take the output of

Re: pySpark driver memory limit

2017-11-09 Thread Sebastian Piu
This is my experience too when running under yarn at least On Thu, 9 Nov 2017, 07:11 Nicolas Paris, wrote: > Le 06 nov. 2017 à 19:56, Nicolas Paris écrivait : > > Can anyone clarify the driver memory aspects of pySpark? > > According to [1], spark.driver.memory limits JVM +

Re: Suggestions on using scala/python for Spark Streaming

2017-10-26 Thread Sebastian Piu
Have a look at how pyspark works in conjunction with spark as it is not just a matter of language preference. There are several implications and a performance price to pay if you go with python. At the end of the day only you can answer whether that price is worth over retraining your team in

Re: Spark - Partitions

2017-10-17 Thread Sebastian Piu
ltaDSQry) > sparkSession.sql(deltaDSQry) > > > Here is the code and also properties used in my project. > > > On Tue, Oct 17, 2017 at 3:38 PM, Sebastian Piu <sebastian@gmail.com> > wrote: > >> Can you share some code? >> >> On Tue, 17 Oct 2017

Re: Spark - Partitions

2017-10-17 Thread Sebastian Piu
; > On Tue, Oct 17, 2017 at 3:07 PM, Sebastian Piu <sebastian@gmail.com> > wrote: > >> You have to repartition/coalesce *after *the action that is causing the >> shuffle as that one will take the value you've set >> >> On Tue, Oct 17, 2017 at 8:40 PM

Re: Spark - Partitions

2017-10-17 Thread Sebastian Piu
You have to repartition/coalesce *after *the action that is causing the shuffle as that one will take the value you've set On Tue, Oct 17, 2017 at 8:40 PM KhajaAsmath Mohammed < mdkhajaasm...@gmail.com> wrote: > Yes still I see more number of part files and exactly the number I have > defined

Re: Job spark blocked and runs indefinitely

2017-10-11 Thread Sebastian Piu
We do have this issue randomly too, so interested in hearing if someone was able to get to the bottom of it On Wed, 11 Oct 2017, 13:40 amine_901, wrote: > We encounter a problem on a Spark job 1.6(on yarn) that never ends, whene > several jobs launched

Thrift Server as JDBC endpoint

2017-03-15 Thread Sebastian Piu
Hi all, I'm doing some research on best ways to expose data created by some of our spark jobs so that they can be consumed by a client (A Web UI). The data we need to serve might be huge but we can control the type of queries that are submitted e.g.: * Limit number of results * only accept

Re: Location for the additional jar files in Spark

2016-12-27 Thread Sebastian Piu
I take you don't want to use the --jars option to avoid moving them every time? On Tue, 27 Dec 2016, 10:33 Mich Talebzadeh, wrote: > When one runs in Local mode (one JVM) on an edge host (the host user > accesses the cluster), it is possible to put additional jar file

Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Sebastian Piu
Is there any reason you need a context on the application launching the jobs? You can use SparkLauncher in a normal app and just listen for state transitions On Wed, 21 Dec 2016, 11:44 Naveen, wrote: > Hi Team, > > Thanks for your responses. > Let me give more details in

Re: Reading parquet files into Spark Streaming

2016-08-27 Thread Sebastian Piu
Forgot to paste the link... http://ramblings.azurewebsites.net/2016/01/26/save-parquet-rdds-in-apache-spark/ On Sat, 27 Aug 2016, 19:18 Sebastian Piu, <sebastian@gmail.com> wrote: > Hi Renato, > > Check here on how to do it, it is in Java but you can translate it to > Sca

Re: Reading parquet files into Spark Streaming

2016-08-27 Thread Sebastian Piu
Hi Renato, Check here on how to do it, it is in Java but you can translate it to Scala if that is what you need. Cheers On Sat, 27 Aug 2016, 14:24 Renato Marroquín Mogrovejo, < renatoj.marroq...@gmail.com> wrote: > Hi Akhilesh, > > Thanks for your response. > I am using Spark 1.6.1 and what I

Re: "Schemaless" Spark

2016-08-19 Thread Sebastian Piu
You can do operations without a schema just fine, obviously the more you know about your data the more tools you will have, it is hard without more context on what you are trying to achieve. On Fri, 19 Aug 2016, 22:55 Efe Selcuk, wrote: > Hi Spark community, > > This is a

Re: Specify node where driver should run

2016-06-07 Thread Sebastian Piu
lebzadeh > > > > > > > > LinkedIn > > > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > > > > > > > > http://talebzadehmich.wordpress.com > > > > > > > > > > On 7 June 2016 at

Re: Specify node where driver should run

2016-06-07 Thread Sebastian Piu
What you are explaining is right for yarn-client mode, but the question is about yarn-cluster in which case the spark driver is also submitted and run in one of the node managers On Tue, 7 Jun 2016, 13:45 Mich Talebzadeh, wrote: > can you elaborate on the above

Re: Fill Gaps between rows

2016-04-26 Thread Sebastian Piu
Yes you need hive Context for the window functions, but you don't need hive for it to work On Tue, 26 Apr 2016, 14:15 Andrés Ivaldi, wrote: > Hello, do exists an Out Of the box for fill in gaps between rows with a > given condition? > As example: I have a source table with

Re: Use only latest values

2016-04-09 Thread Sebastian Piu
Have a look at mapWithState if you are using 1.6+ On Sat, 9 Apr 2016, 08:04 Daniela S, wrote: > Hi, > > I would like to cache values and to use only the latest "valid" values to > build a sum. > In more detail, I receive values from devices periodically. I would like > to

Re: Stress testing hdfs with Spark

2016-04-05 Thread Sebastian Piu
You could they using TestDFSIO for raw hdfs performance, but we found it not very relevant Another way could be to either generate a file and then read it and write it back. For some of our use cases we are populated a Kafka queue on the cluster (on different disks) and used spark streaming to do

Re: Problem using saveAsNewAPIHadoopFile API

2016-03-25 Thread Sebastian Piu
I dont understand about the race condition comment you mention. Have you seen this somewhere? That timestamp will be the same on each worker for that rdd, and each worker is handling a different partition which will be reflected on the filename, so no data will be overwriting. In fact this is

Re: Problem using saveAsNewAPIHadoopFile API

2016-03-22 Thread Sebastian Piu
As you said, create a folder for each different minute, you can use the rdd.time also as a timestamp. Also you might want to have a look at the window function for the batching On Tue, 22 Mar 2016, 17:43 vetal king, wrote: > Hi Cody, > > Thanks for your reply. > > Five

Re: Best way to store Avro Objects as Parquet using SPARK

2016-03-21 Thread Sebastian Piu
We use this, but not sure how the schema is stored Job job = Job.getInstance(); ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class); AvroParquetOutputFormat.setSchema(job, schema); LazyOutputFormat.setOutputFormatClass(job, new ParquetOutputFormat().getClass());

Re: How to gracefully handle Kafka OffsetOutOfRangeException

2016-03-19 Thread Sebastian Piu
Try to toubleshoot why it is happening, maybe some messages are too big to be read from the topic? I remember getting that error and that was the cause On Fri, Mar 18, 2016 at 11:16 AM Ramkumar Venkataraman < ram.the.m...@gmail.com> wrote: > I am using Spark streaming and reading data from Kafka

Re: Spark streaming not remembering previous state

2016-02-27 Thread Sebastian Piu
Here: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala On Sat, 27 Feb 2016, 20:42 Sebastian Piu, <sebastian@gmail.com> wrote: > You need to create the streaming context using an existing c

Re: Spark streaming not remembering previous state

2016-02-27 Thread Sebastian Piu
You need to create the streaming context using an existing checkpoint for it to work See sample here On Sat, 27 Feb 2016, 20:28 Vinti Maheshwari, wrote: > Hi All, > > I wrote spark streaming program with stateful transformation. > It seems like my spark streaming

Re: spark-xml can't recognize schema

2016-02-21 Thread Sebastian Piu
it for other > books,did you include ‎that in your xml file? > > *From: *Sebastian Piu > *Sent: *Sunday, 21 February 2016 20:00 > *To: *Prathamesh Dharangutte > *Cc: *user@spark.apache.org > *Subject: *Re: spark-xml can't recognize schema > > Just ran that code and it w

Re: spark-xml can't recognize schema

2016-02-21 Thread Sebastian Piu
DataFrame = null > > var newDf : DataFrame = null > > df = sqlContext.read > .format("com.databricks.spark.xml") > .option("rowTag","book") > .load("/home/prathamsh/Workspace/Xml/datafiles/sample.xml") > >

Re: spark-xml can't recognize schema

2016-02-21 Thread Sebastian Piu
Can you paste the code you are using? On Sun, 21 Feb 2016, 13:19 Prathamesh Dharangutte wrote: > I am trying to parse xml file using spark-xml. But for some reason when i > print schema it only shows root instead of the hierarchy. I am using > sqlcontext to read the

Re: Streaming with broadcast joins

2016-02-19 Thread Sebastian Piu
fy in spark UI > that broadcast join is being used. Also, if the files are read and > broadcasted each batch?? > > Thanks for the help! > > > On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian@gmail.com> > wrote: > >> I don't see anything obviously wrong on your

Re: Streaming with broadcast joins

2016-02-19 Thread Sebastian Piu
.schema(schema).load("file:///shared/data/test-data.txt") > ) > > val lines = ssc.socketTextStream("DevNode", ) > > lines.foreachRDD((rdd, timestamp) => { > val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toIn

Re: Streaming with broadcast joins

2016-02-18 Thread Sebastian Piu
can be broadcast once and used locally for each > RDD? > Right now every batch the metadata file is read and the DF is broadcasted. > I tried sc.broadcast and that did not provide this behavior. > > Srikanth > > > On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian

Re: Streaming with broadcast joins

2016-02-17 Thread Sebastian Piu
You should be able to broadcast that data frame using sc.broadcast and join against it. On Wed, 17 Feb 2016, 21:13 Srikanth wrote: > Hello, > > I have a streaming use case where I plan to keep a dataset broadcasted and > cached on each executor. > Every micro batch in

Re: Fair Scheduler Pools with Kafka Streaming

2016-02-16 Thread Sebastian Piu
Yes it is related to concurrentJobs, so you need to increase that. Salt that will mean that if you get overlapping batches then those will be executed in parallel too On Tue, 16 Feb 2016, 18:33 p pathiyil wrote: > Hi, > > I am trying to use Fair Scheduler Pools with Kafka

Re: Check if column exists in Schema

2016-02-16 Thread Sebastian Piu
-Data-Analytics-Spark-Practitioners/dp/1484209656/> > > > > *From:* Sebastian Piu [mailto:sebastian@gmail.com] > *Sent:* Monday, February 15, 2016 11:21 AM > *To:* user > *Subject:* Re: Check if column exists in Schema > > > > I just realised this is a bit vague, I'

Re: Check if column exists in Schema

2016-02-15 Thread Sebastian Piu
or it is null, i'd get it from some other place On Mon, Feb 15, 2016 at 7:17 PM Sebastian Piu <sebastian@gmail.com> wrote: > Is there any way of checking if a given column exists in a Dataframe? >

Check if column exists in Schema

2016-02-15 Thread Sebastian Piu
Is there any way of checking if a given column exists in a Dataframe?

Re: org.apache.spark.sql.AnalysisException: undefined function lit;

2016-02-13 Thread Sebastian Piu
I've never done it that way but you can simply use the withColumn method in data frames to do it. On 13 Feb 2016 2:19 a.m., "Andy Davidson" wrote: > I am trying to add a column with a constant value to my data frame. Any > idea what I am doing wrong? > > Kind

Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-13 Thread Sebastian Piu
? >> >> When my application goes down and is restarted from checkpoint, will >> mapWithState need to recompute the previous batches data ? >> >> Also, to use mapWithState I will need to upgrade my application as I am >> using version 1.4.0 and mapWithState isnt supported there. Is th

Re: Spark Streaming with Kafka: Dealing with 'slow' partitions

2016-02-11 Thread Sebastian Piu
Have you tried using fair scheduler and queues On 12 Feb 2016 4:24 a.m., "p pathiyil" wrote: > With this setting, I can see that the next job is being executed before > the previous one is finished. However, the processing of the 'hot' > partition eventually hogs all the

Re: Skip empty batches - spark streaming

2016-02-11 Thread Sebastian Piu
od return None to skip a batch. > > On Thu, Feb 11, 2016 at 1:03 PM, Sebastian Piu <sebastian@gmail.com> > wrote: > >> I was wondering if there is there any way to skip batches with zero >> events when streaming? >> By skip I mean avoid the empty rdd from being created at all? >> > >

Re: Skip empty batches - spark streaming

2016-02-11 Thread Sebastian Piu
; wrote: > Yeah, DirectKafkaInputDStream always returns a RDD even if it's empty. > Feel free to send a PR to improve it. > > On Thu, Feb 11, 2016 at 1:09 PM, Sebastian Piu <sebastian@gmail.com> > wrote: > >> I'm using the Kafka direct stream api but I can have a

Re: Stateful Operation on JavaPairDStream Help Needed !!

2016-02-11 Thread Sebastian Piu
Looks like mapWithState could help you? On 11 Feb 2016 8:40 p.m., "Abhishek Anand" wrote: > Hi All, > > I have an use case like follows in my production environment where I am > listening from kafka with slideInterval of 1 min and windowLength of 2 > hours. > > I have a

Skip empty batches - spark streaming

2016-02-11 Thread Sebastian Piu
I was wondering if there is there any way to skip batches with zero events when streaming? By skip I mean avoid the empty rdd from being created at all?

Re: Skip empty batches - spark streaming

2016-02-11 Thread Sebastian Piu
h the rdd (before any transformations) > > In any recent version of spark, isEmpty on a KafkaRDD is a driver-side > only operation that is basically free. > > > On Thu, Feb 11, 2016 at 3:19 PM, Sebastian Piu <sebastian@gmail.com> > wrote: > >> Yes, and as far as I

mapWithState / stateSnapshots() yielding empty rdds?

2016-01-29 Thread Sebastian Piu
Hi All, I'm playing with the new mapWithState functionality but I can't get it quite to work yet. I'm doing two print() calls on the stream: 1. after mapWithState() call, first batch shows results - next batches yield empty 2. after stateSnapshots(), always yields an empty RDD Any pointers on

Re: mapWithState / stateSnapshots() yielding empty rdds?

2016-01-29 Thread Sebastian Piu
Just saw I'm not calling state.update() in my trackState function. I guess that is the issue! On Fri, Jan 29, 2016 at 9:36 AM, Sebastian Piu <sebastian@gmail.com> wrote: > Hi All, > > I'm playing with the new mapWithState functionality but I can't get it > quite to

Re: can't find trackStateByKey in 1.6.0 jar?

2016-01-28 Thread Sebastian Piu
That explains it! Thanks :) On Thu, Jan 28, 2016 at 9:52 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > its been renamed to mapWithState when 1.6.0 was released. :) > > > > On Thu, Jan 28, 2016 at 1:51 AM, Sebastian Piu <sebastian@gmail.com> > wrote:

can't find trackStateByKey in 1.6.0 jar?

2016-01-28 Thread Sebastian Piu
I wanted to give the new trackStateByKey method a try, but I'm missing something very obvious here as I can't see it on the 1.6.0 jar. Is there anything in particular I have to do or is just maven playing tricks with me? this is the dependency I'm using: org.apache.spark spark-streaming_2.10

FAIR scheduler in Spark Streaming

2016-01-26 Thread Sebastian Piu
Hi, I'm trying to get *FAIR *scheduling to work in a spark streaming app (1.6.0). I've found a previous mailing list where it is indicated to do: dstream.foreachRDD { rdd => rdd.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") // set the pool rdd.count() // or whatever job } This

Re: FAIR scheduler in Spark Streaming

2016-01-26 Thread Sebastian Piu
ever, you need to > keep in mind that setting it to a bigger number will allow jobs of several > batches running at the same time. It's hard to predicate the behavior and > sometimes will surprise you. > > On Tue, Jan 26, 2016 at 9:57 AM, Sebastian Piu <sebastian@gmail.com> > w

Re: java.lang.ArrayIndexOutOfBoundsException when attempting broadcastjoin

2016-01-21 Thread Sebastian Piu
I'm using Spark 1.6.0. I tried removing Kryo and reverting back to Java Serialisation, and get a different error which maybe points in the right direction... java.lang.AssertionError: assertion failed: No plan for BroadcastHint +- InMemoryRelation

Re: No plan for BroadcastHint when attempting broadcastjoin

2016-01-21 Thread Sebastian Piu
wrote: > Modified subject to reflect new error encountered. > > Interesting - SPARK-12275 is marked fixed against 1.6.0 > > On Thu, Jan 21, 2016 at 7:30 AM, Sebastian Piu <sebastian@gmail.com> > wrote: > >> I'm using Spark 1.6.0. >> >> I tried removing Kry