Re: Serialization or internal functions?

2020-04-09 Thread Vadim Semenov
You can take a look at the code that Spark generates: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.debug.codegenString val spark: SparkSession import org.apache.spark.sql.functions._ import spark.implicits._ val data = Seq("A","b","c").toDF("col")

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-11 Thread Vadim Semenov
ark dev out there who is looking for a problem to > solve. > > On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov > wrote: >> >> Basically, the driver tracks partitions and sends it over to >> executors, so what it's trying to do is to serialize and compress the >> map but

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-08 Thread Vadim Semenov
Basically, the driver tracks partitions and sends it over to executors, so what it's trying to do is to serialize and compress the map but because it's so big, it goes over 2GiB and that's Java's limit on the max size of byte arrays, so the whole thing drops. The size of data doesn't matter here

Re: intermittent Kryo serialization failures in Spark

2019-09-18 Thread Vadim Semenov
t... who knows). I will try it > with your suggestions and see if it solves the problem. > > thanks, > Jerry > > On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov wrote: > >> Pre-register your classes: >> >> ``` >> import com.esotericsoftware.kryo.Kryo >>

Re: intermittent Kryo serialization failures in Spark

2019-09-17 Thread Vadim Semenov
Pre-register your classes: ``` import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(Class.forName("[[B")) // byte[][]

Re: EMR Spark 2.4.3 executor hang

2019-09-03 Thread Vadim Semenov
Try "spark.shuffle.io.numConnectionsPerPeer=10" On Fri, Aug 30, 2019 at 10:22 AM Daniel Zhang wrote: > Hi, All: > We are testing the EMR and compare with our on-premise HDP solution. We > use one application as the test: > EMR (5.21.1) with Hadoop 2.8.5 + Spark 2.4.3 vs HDP (2.6.3) with Hadoop

Re: Stream is corrupted in ShuffleBlockFetcherIterator

2019-08-16 Thread Vadim Semenov
This is what you're looking for: Handle large corrupt shuffle blocks https://issues.apache.org/jira/browse/SPARK-26089 So until 3.0 the only way I can think of is to reduce the size/split your job into many On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin wrote: > Hello, Spark community! > >

Re: [Meta] Moderation request diversion?

2019-06-24 Thread Vadim Semenov
just set up a filter [image: Screen Shot 2019-06-24 at 4.51.20 PM.png] On Mon, Jun 24, 2019 at 4:46 PM Jeff Evans wrote: > There seem to be a lot of people trying to unsubscribe via the main > address, rather than following the instructions from the welcome > email. Of course, this is not all

Re: [Spark Core]: What is the release date for Spark 3 ?

2019-06-13 Thread Vadim Semenov
next spark summit On Thu, Jun 13, 2019 at 3:58 AM Alex Dettinger wrote: > Follow up on the release date for Spark 3. Any guesstimate or rough > estimation without commitment would be helpful :) > > Cheers, > Alex > > On Mon, Jun 10, 2019 at 5:24 PM Alex Dettinger > wrote: > >> Hi guys, >> >>

Re: Difference between Checkpointing and Persist

2019-04-18 Thread Vadim Semenov
saving/checkpointing would be preferable in case of a big data set because: - the RDD gets saved to HDFS and the DAG gets truncated so if some partitions/executors fail it won't result in recomputing everything - you don't use memory for caching therefore the JVM heap is going to be smaller

Re: [SHUFFLE]FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

2019-03-12 Thread Vadim Semenov
I/We have seen this error before on 1.6 but ever since we upgraded to 2.1 two years ago we haven't seen it On Tue, Mar 12, 2019 at 2:19 AM wangfei wrote: > Hi all, > Non-deterministic FAILED_TO_UNCOMPRESS(5) or ’Stream is corrupted’ > errors > may occur during shuffle read, described as

Re: Difference between dataset and dataframe

2019-02-19 Thread Vadim Semenov
> > 1) Is there any difference in terms performance when we use datasets over > dataframes? Is it significant to choose 1 over other. I do realise there > would be some overhead due case classes but how significant is that? Are > there any other implications. As long as you use the DataFrame

Re: "where" clause able to access fields not in its schema

2019-02-13 Thread Vadim Semenov
Yeah, the filter gets infront of the select after analyzing scala> b.where($"bar" === 20).explain(true) == Parsed Logical Plan == 'Filter ('bar = 20) +- AnalysisBarrier +- Project [foo#6] +- Project [_1#3 AS foo#6, _2#4 AS bar#7] +- SerializeFromObject

Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-12 Thread Vadim Semenov
dn't any away to inject "poison pill" into repartition call :( > > пн, 11 февр. 2019 г. в 21:19, Vadim Semenov : >> >> something like this >> >> import org.apache.spark.TaskContext >> ds.map(r => { >> val taskContext = TaskContext.get()

Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-11 Thread Vadim Semenov
something like this import org.apache.spark.TaskContext ds.map(r => { val taskContext = TaskContext.get() if (taskContext.partitionId == 1000) { throw new RuntimeException } r }) On Mon, Feb 11, 2019 at 8:41 AM Serega Sheypak wrote: > > I need to crash task which does repartition. >

Re: Job hangs in blocked task in final parquet write stage

2018-11-27 Thread Vadim Semenov
Hey Conrad, has it started happening recently? We recently started having some sporadic problems with drivers on EMR when it gets stuck, up until two weeks ago everything was fine. We're trying to figure out with the EMR team where the issue is coming from. On Tue, Nov 27, 2018 at 6:29 AM Conrad

Re: Spark DataSets and multiple write(.) calls

2018-11-19 Thread Vadim Semenov
You can use checkpointing, in this case Spark will write out an rdd to whatever destination you specify, and then the RDD can be reused from the checkpointed state avoiding recomputing. On Mon, Nov 19, 2018 at 7:51 AM Dipl.-Inf. Rico Bergmann < i...@ricobergmann.de> wrote: > Thanks for your

Re: Does Spark have a plan to move away from sun.misc.Unsafe?

2018-10-25 Thread Vadim Semenov
Here you go: the umbrella ticket: https://issues.apache.org/jira/browse/SPARK-24417 and the sun.misc.unsafe one https://issues.apache.org/jira/browse/SPARK-24421 On Wed, Oct 24, 2018 at 8:08 PM kant kodali wrote: > > Hi All, > > Does Spark have a plan to move away from sun.misc.Unsafe to

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2018-09-07 Thread Vadim Semenov
You have too many partitions, so when the driver is trying to gather the status of all map outputs and send back to executors it chokes on the size of the structure that needs to be GZipped, and since it's bigger than 2GiB, it produces OOM. On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman wrote: >

Re: java.lang.IndexOutOfBoundsException: len is negative - when data size increases

2018-08-16 Thread Vadim Semenov
one of the spills becomes bigger than 2GiB and can't be loaded fully (as arrays in Java can't have more than 2^32 values) > > org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76) You can try increasing the number of partitions, so

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Vadim Semenov
`coalesce` sets the number of partitions for the last stage, so you have to use `repartition` instead which is going to introduce an extra shuffle stage On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers wrote: > > one small correction: lots of files leads to pressure on the spark driver > program

Re: Broadcast variable size limit?

2018-08-05 Thread Vadim Semenov
That’s the max size of a byte array in Java, limited by the length which is defined as integer, and in most JVMS arrays can’t hold more than Int.MaxValue - 8 elements. Other way to overcome this is to create multiple broadcast variables On Sunday, August 5, 2018, klrmowse wrote: > i don't need

Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Vadim Semenov
object MyDatabseSingleton { @transient lazy val dbConn = DB.connect(…) `transient` marks the variable to be excluded from serialization and `lazy` would open connection only when it's needed and also makes sure that the val is thread-safe

Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-27 Thread Vadim Semenov
`spark.worker.cleanup.enabled=true` doesn't work for YARN. On Fri, Jul 27, 2018 at 8:52 AM dineshdharme wrote: > > I am trying to do few (union + reduceByKey) operations on a hiearchical > dataset in a iterative fashion in rdd. The first few loops run fine but on > the subsequent loops, the

Re: Dataframe joins - UnsupportedOperationException: Unimplemented type: IntegerType

2018-07-09 Thread Vadim Semenov
That usually happens when you have different types for a column in some parquet files. In this case, I think you have a column of `Long` type that got a file with `Integer` type, I had to deal with similar problem once. You would have to cast it yourself to Long. On Mon, Jul 9, 2018 at 2:53 PM

Re: Dynamic allocation not releasing executors after unpersisting all cached data

2018-07-09 Thread Vadim Semenov
Try doing `unpersist(blocking=true)` On Mon, Jul 9, 2018 at 2:59 PM Jeffrey Charles wrote: > > I'm persisting a dataframe in Zeppelin which has dynamic allocation enabled > to get a sense of how much memory the dataframe takes up. After I note the > size, I unpersist the dataframe. For some

Re: Inferring Data driven Spark parameters

2018-07-03 Thread Vadim Semenov
You can't change the executor/driver cores/memory on the fly once you've already started a Spark Context. On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu wrote: > > We aren't using Oozie or similar, moreover, the end to end job shall be > exactly the same, but the data will be extremely different

Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Vadim Semenov
As typical `JAVA_OPTS` you need to pass as a single parameter: --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB" Also you got an extra space in the parameter, there should be no space after the colon symbol On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu wrote: > > Hi, > > I used

Re: Can we get the partition Index in an UDF

2018-06-25 Thread Vadim Semenov
Try using `TaskContext`: import org.apache.spark.TaskContext val partitionId = TaskContext.getPartitionId() On Mon, Jun 25, 2018 at 11:17 AM Lalwani, Jayesh wrote: > > We are trying to add a column to a Dataframe with some data that is seeded by > some random data. We want to be able to

Writing rows directly in Tungsten format into memory

2018-06-12 Thread Vadim Semenov
Is there a way to write rows directly into off-heap memory in the Tungsten format bypassing creating objects? I have a lot of rows, and right now I'm creating objects, and they get encoded, but because of the number of rows, it creates significant pressure on GC. I'd like to avoid creating

Re: Time series data

2018-05-24 Thread Vadim Semenov
Yeah, it depends on what you want to do with that timeseries data. We at Datadog process trillions of points daily using Spark, I cannot really go about what exactly we do with the data, but just saying that Spark can handle the volume, scale well and be fault-tolerant, albeit everything I said

Re:

2018-05-16 Thread Vadim Semenov
Upon downsizing to 20 partitions some of your partitions become too big, and I see that you're doing caching, and executors try to write big partitions to disk, but fail because they exceed 2GiB > Caused by: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at

Re: Free Column Reference with $

2018-05-04 Thread Vadim Semenov
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L38-L47 It's called String Interpolation See "Advanced Usage" here https://docs.scala-lang.org/overviews/core/string-interpolation.html On Fri, May 4, 2018 at 10:10 AM, Christopher Piggott

Re: spark.executor.extraJavaOptions inside application code

2018-05-02 Thread Vadim Semenov
You need to pass config before creating a session val conf = new SparkConf() // All three methods below are equivalent conf.set("spark.executor.extraJavaOptions", "-Dbasicauth=myuser:mypassword") conf.set("spark.executorEnv.basicauth", "myuser:mypassword") conf.setExecutorEnv("basicauth",

Re: [Spark 2.x Core] .collect() size limit

2018-04-30 Thread Vadim Semenov
`.collect` returns an Array, and array's can't have more than Int.MaxValue elements, and in most JVMs it's lower: `Int.MaxValue - 8` So it puts upper limit, however, you can just create Array of Arrays, and so on, basically limitless, albeit with some gymnastics.

Re: Tuning Resource Allocation during runtime

2018-04-27 Thread Vadim Semenov
You can not change dynamically the number of cores per executor or cores per task, but you can change the number of executors. In one of my jobs I have something like this, so when I know that I don't need more than 4 executors, I kill all other executors (assuming that they don't hold any cached

Re: Spark Job Server application compilation issue

2018-03-14 Thread Vadim Semenov
This question should be directed to the `spark-jobserver` group: https://github.com/spark-jobserver/spark-jobserver#contact They also have a gitter chat. Also include the errors you get once you're going to be asking them a question On Wed, Mar 14, 2018 at 1:37 PM, sujeet jog

Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
But overall, I think the original approach is not correct. If you get a single file in 10s GB, the approach is probably must be reworked. I don't see why you can't just write multiple CSV files using Spark, and then concatenate them without Spark On Fri, Mar 9, 2018 at 10:02 AM, Vadim Semenov

Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
ocation. > > Thanks > Deepak > > On Fri, Mar 9, 2018, 20:12 Vadim Semenov <va...@datadoghq.com> wrote: > >> because `coalesce` gets propagated further up in the DAG in the last >> stage, so your last stage only has one task. >> >> You need to break

Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
because `coalesce` gets propagated further up in the DAG in the last stage, so your last stage only has one task. You need to break your DAG so your expensive operations would be in a previous stage before the stage with `.coalesce(1)` On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <

Re: Spark & S3 - Introducing random values into key names

2018-03-08 Thread Vadim Semenov
You need to put randomness into the beginning of the key, if you put it other than into the beginning, it's not guaranteed that you're going to have good performance. The way we achieved this is by writing to HDFS first, and then having a custom DistCp implemented using Spark that copies parquet

Re: OutOfDirectMemoryError for Spark 2.2

2018-03-06 Thread Vadim Semenov
Do you have a trace? i.e. what's the source of `io.netty.*` calls? And have you tried bumping `-XX:MaxDirectMemorySize`? On Tue, Mar 6, 2018 at 12:45 AM, Chawla,Sumit wrote: > Hi All > > I have a job which processes a large dataset. All items in the dataset > are

Re: Can I get my custom spark strategy to run last?

2018-03-02 Thread Vadim Semenov
Something like this? sparkSession.experimental.extraStrategies = Seq(Strategy) val logicalPlan = df.logicalPlan val newPlan: LogicalPlan = Strategy(logicalPlan) Dataset.ofRows(sparkSession, newPlan) On Thu, Mar 1, 2018 at 8:20 PM, Keith Chapman wrote: > Hi, > > I'd

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Yeah, without actually seeing what's happening on that line, it'd be difficult to say for sure. You can check what patches HortonWorks applied, or/and ask them. And yeah, seg fault is totally possible on any size of the data. But you should've seen it in the `stdout` (assuming that the regular

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Who's your spark provider? EMR, Azure, Databricks, etc.? Maybe contact them, since they've probably applied some patches Also have you checked `stdout` for some Segfaults? I vaguely remember getting `Task failed while writing rows at` and seeing some segfaults that caused that On Wed, Feb 28,

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
I'm sorry, didn't see `Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)` Are you sure that you use 2.2.0? I don't see any code on that line

Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
There should be another exception trace (basically, the actual cause) after this one, could you post it? On Wed, Feb 28, 2018 at 1:39 PM, unk1102 wrote: > Hi I am getting the following exception when I try to write DataFrame using > the following code. Please guide. I am

Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
s per node 8. Am I missing to relate here. > > What I m thinking now is number of vote = number of threads. > > > > On Mon, 26 Feb 2018 at 18:45, Vadim Semenov <va...@datadoghq.com> wrote: > >> All used cores aren't getting reported correctly in EMR, and YARN it

Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
All used cores aren't getting reported correctly in EMR, and YARN itself has no control over it, so whatever you put in `spark.executor.cores` will be used, but in the ResourceManager you will only see 1 vcore used per nodemanager. On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman

Re: Sharing spark executor pool across multiple long running spark applications

2018-02-07 Thread Vadim Semenov
The other way might be to launch a single SparkContext and then run jobs inside of it. You can take a look at these projects: - https://github.com/spark-jobserver/spark-jobserver#persistent-context-mode---faster--required-for-related-jobs - http://livy.incubator.apache.org Problems with this

Re: Passing an array of more than 22 elements in a UDF

2017-12-26 Thread Vadim Semenov
Functions are still limited to 22 arguments https://github.com/scala/scala/blob/2.13.x/src/library/scala/Function22.scala On Tue, Dec 26, 2017 at 2:19 PM, Felix Cheung wrote: > Generally the 22 limitation is from Scala 2.10. > > In Scala 2.11, the issue with case

Re: /tmp fills up to 100GB when using a window function

2017-12-20 Thread Vadim Semenov
Ah, yes, I missed that part it's `spark.local.dir` spark.local.dir /tmp Directory to use for "scratch" space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories

Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
:08 AM, Mihai Iacob <mia...@ca.ibm.com> wrote: > When does spark remove them? > > > Regards, > > *Mihai Iacob* > DSX Local <https://datascience.ibm.com/local> - Security, IBM Analytics > > > > - Original message - > From: Vadim Semenov <

Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
Spark doesn't remove intermediate shuffle files if they're part of the same job. On Mon, Dec 18, 2017 at 3:10 PM, Mihai Iacob wrote: > This code generates files under /tmp...blockmgr... which do not get > cleaned up after the job finishes. > > Anything wrong with the code

Re: What does Blockchain technology mean for Big Data? And how Hadoop/Spark will play role with it?

2017-12-19 Thread Vadim Semenov
I think it means that we can replace HDFS with a blockchain-based FS, and then offload some processing to smart contracts. On Mon, Dec 18, 2017 at 11:59 PM, KhajaAsmath Mohammed < mdkhajaasm...@gmail.com> wrote: > I am looking for same answer too .. will wait for response from other > people > >

Re: NullPointerException while reading a column from the row

2017-12-19 Thread Vadim Semenov
getAs defined as: def getAs[T](i: Int): T = get(i).asInstanceOf[T] and when you do toString you call Object.toString which doesn't depend on the type, so asInstanceOf[T] get dropped by the compiler, i.e. row.getAs[Int](0).toString -> row.get(0).toString we can confirm that by writing a simple

Re: RDD[internalRow] -> DataSet

2017-12-12 Thread Vadim Semenov
not possible, but you can add your own object in your project to the spark's package that would give you access to private methods package org.apache.spark.sql import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.LogicalRDD import

Re: JDK1.8 for spark workers

2017-11-29 Thread Vadim Semenov
You can pass `JAVA_HOME` environment variable `spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0` On Wed, Nov 29, 2017 at 10:54 AM, KhajaAsmath Mohammed < mdkhajaasm...@gmail.com> wrote: > Hi, > > I am running cloudera version of spark2.1 and our cluster is on JDK1.7. > For some of the

Re: Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-22 Thread Vadim Semenov
The error message seems self-explanatory, try to figure out what's the disk quota you have for your user. On Wed, Nov 22, 2017 at 8:23 AM, Chetan Khatri wrote: > Anybody reply on this ? > > On Tue, Nov 21, 2017 at 3:36 PM, Chetan Khatri < >

Re: Kryo not registered class

2017-11-20 Thread Vadim Semenov
Try: Class.forName("[Lorg.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation;") On Sun, Nov 19, 2017 at 3:24 PM, Angel Francisco Orta < angel.francisco.o...@gmail.com> wrote: > Hello, I'm with spark 2.1.0 with scala and I'm

Re: Process large JSON file without causing OOM

2017-11-15 Thread Vadim Semenov
There's a lot of off-heap memory involved in decompressing Snappy, compressing ZLib. Since you're running using `local[*]`, you process multiple tasks simultaneously, so they all might consume memory. I don't think that increasing heap will help, since it looks like you're hitting system memory

Re: Spark based Data Warehouse

2017-11-12 Thread Vadim Semenov
It's actually quite simple to answer > 1. Is Spark SQL and UDF, able to handle all the workloads? Yes > 2. What user interface did you provide for data scientist, data engineers and analysts Home-grown platform, EMR, Zeppelin > What are the challenges in running concurrent queries, by many

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Vadim Semenov
When you do `Dataset.rdd` you actually create a new job here you can see what it does internally: https://github.com/apache/spark/blob/master/sql/core/ src/main/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828 On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala

Re: EMR: Use extra mounted EBS volumes for spark.local.dir

2017-10-10 Thread Vadim Semenov
that's probably better be directed to the AWS support On Sun, Oct 8, 2017 at 9:54 PM, Tushar Sudake wrote: > Hello everyone, > > I'm using 'r4.8xlarge' instances on EMR for my Spark Application. > To each node, I'm attaching one 512 GB EBS volume. > > By logging in into

Re: Unable to run Spark Jobs in yarn cluster mode

2017-10-10 Thread Vadim Semenov
Try increasing the `spark.yarn.am.waitTime` parameter, it's by default set to 100ms which might not be enough in certain cases. On Tue, Oct 10, 2017 at 7:02 AM, Debabrata Ghosh wrote: > Hi All, > I am constantly hitting an error : "ApplicationMaster: >

Re: how do you deal with datetime in Spark?

2017-10-03 Thread Vadim Semenov
I usually check the list of Hive UDFs as Spark has implemented almost all of them https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions Or/and check `org.apache.spark.sql.functions` directly:

Re: Multiple filters vs multiple conditions

2017-10-03 Thread Vadim Semenov
Since you're using Dataset API or RDD API, they won't be fused together by the Catalyst optimizer unless you use the DF API. Two filters will get executed within one stage, and there'll be very small overhead on having two separate filters vs having only one. On Tue, Oct 3, 2017 at 8:14 AM, Ahmed

Re: More instances = slower Spark job

2017-09-29 Thread Vadim Semenov
ya...@gmail.com> > wrote: > > On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov > > <vadim.seme...@datadoghq.com> wrote: > >> Instead of having one job, you can try processing each file in a > separate > >> job, but run multiple jobs in parallel within one Spark

Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-09-29 Thread Vadim Semenov
As alternative: checkpoint the dataframe, collect days, and then delete corresponding directories using hadoop FileUtils, then write the dataframe On Fri, Sep 29, 2017 at 10:31 AM, peay wrote: > Hello, > > I am trying to use

Re: HDFS or NFS as a cache?

2017-09-29 Thread Vadim Semenov
How many files you produce? I believe it spends a lot of time on renaming the files because of the output committer. Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they have 10GbE and you can get good throughput for S3. On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <

Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
allelize(…).map(test => { Model.get().use(…) }) } } ``` On Thu, Sep 28, 2017 at 3:49 PM, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > as an alternative > ``` > spark-submit --files > ``` > > the files will be put on each executor in the working directory, so you &

Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
as an alternative ``` spark-submit --files ``` the files will be put on each executor in the working directory, so you can then load it alongside your `map` function Behind the scene it uses `SparkContext.addFile` method that you can use too

Re: Massive fetch fails, io errors in TransportRequestHandler

2017-09-28 Thread Vadim Semenov
Looks like there's slowness in sending shuffle files, maybe one executor get overwhelmed with all the other executors trying to pull data? Try lifting `spark.network.timeout` further, we ourselves had to push it to 600s from the default 120s On Thu, Sep 28, 2017 at 10:19 AM, Ilya Karpov

Re: More instances = slower Spark job

2017-09-28 Thread Vadim Semenov
Instead of having one job, you can try processing each file in a separate job, but run multiple jobs in parallel within one SparkContext. Something like this should work for you, it'll submit N jobs from the driver, the jobs will run independently, but executors will dynamically work on different

Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-22 Thread Vadim Semenov
1. 40s is pretty negligible unless you run your job very frequently, there can be many factors that influence that. 2. Try to compare the CPU time instead of the wall-clock time 3. Check the stages that got slower and compare the DAGs 4. Test with dynamic allocation disabled On Fri, Sep 22,

Re: SVD computation limit

2017-09-19 Thread Vadim Semenov
This may also be related to https://issues.apache.org/jira/browse/SPARK-22033 On Tue, Sep 19, 2017 at 3:40 PM, Mark Bittmann wrote: > I've run into this before. The EigenValueDecomposition creates a Java > Array with 2*k*n elements. The Java Array is indexed with a native

Re: Configuration for unit testing and sql.shuffle.partitions

2017-09-18 Thread Vadim Semenov
you can create a Super class "FunSuiteWithSparkContext" that's going to create a Spark sessions, Spark context, and SQLContext with all the desired properties. Then you add the class to all the relevant test suites, and that's pretty much it. The other option can be is to pass it as a VM

Re: Bizarre UI Behavior after migration

2017-09-10 Thread Vadim Semenov
rozen, active state. > > > On Mon, May 22, 2017 at 12:50 PM, Vadim Semenov < > vadim.seme...@datadoghq.com> wrote: > >> I believe it shows only the tasks that have actually being executed, if >> there were tasks with no data, they don't get reported. >>

Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Vadim Semenov
n MDC way with spark or something other than to achieve this? > > > > Alex > > > > *From: *Vadim Semenov <vadim.seme...@datadoghq.com> > *Date: *Monday, August 28, 2017 at 5:18 PM > *To: *"Mikhailau, Alex" <alex.mikhai...@mlb.com> > *Cc: *"us

Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-28 Thread Vadim Semenov
When you create a EMR cluster you can specify a S3 path where logs will be saved after cluster, something like this: s3://bucket/j-18ASDKLJLAKSD/containers/application_1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz

Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Vadim Semenov
I didn't tailor it to your needs, but this is what I can offer you, the idea should be pretty clear import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{collect_list, struct} val spark: SparkSession import spark.implicits._ case class Input( a: Int, b: Long, c:

Re: [Spark Core] Is it possible to insert a function directly into the Logical Plan?

2017-08-14 Thread Vadim Semenov
Something like this, maybe? import org.apache.spark.sql.Dataset import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.catalyst.encoders.RowEncoder val df: DataFrame = ??? val spark = df.sparkSession val

Re: count exceed int.MaxValue

2017-08-08 Thread Vadim Semenov
Scala doesn't support ranges >= Int.MaxValue https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/immutable/Range.scala?utf8=✓#L89 You can create two RDDs and unionize them: scala> val rdd = sc.parallelize(1L to Int.MaxValue.toLong).union(sc.parallelize(1L to

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
to delete it somehow else. BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to local disk, making more space in JVM and letting you to avoid hdfs. On Wednesday, August 2, 2017, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > `saveAsObjectFile` doesn't save the DAG, i

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it just saves data to some destination. `cache/persist` allow you to cache data and keep the DAG in case of some executor that holds data goes down, so Spark would still be able to recalculate missing partitions

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
Also check the `RDD.checkpoint()` method https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1550 On Wed, Aug 2, 2017 at 8:46 PM, Vadim Semenov <vadim.seme...@datadoghq.com> wrote: > I'm not sure that "checkpointed" m

Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
ed RDD is persisted in > memory, otherwise saving it on a file will require recomputation."* > > > To me that means checkpoint will not prevent the recomputation that i was > hoping for > -- > *From:* Vadim Semenov <vadim.seme...@datadogh

Re: How can i remove the need for calling cache

2017-08-01 Thread Vadim Semenov
You can use `.checkpoint()`: ``` val sc: SparkContext sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory") myrdd.checkpoint() val result1 = myrdd.map(op1(_)) result1.count() // Will save `myrdd` to HDFS and do map(op1… val result2 = myrdd.map(op2(_)) result2.count() // Will load `myrdd` from

Re: How to insert a dataframe as a static partition to a partitioned table

2017-07-20 Thread Vadim Semenov
This should work: ``` ALTER TABLE `table` ADD PARTITION (partcol=1) LOCATION '/path/to/your/dataset' ``` On Wed, Jul 19, 2017 at 6:13 PM, ctang wrote: > I wonder if there are any easy ways (or APIs) to insert a dataframe (or > DataSet), which does not contain the partition

Re: underlying checkpoint

2017-07-13 Thread Vadim Semenov
You need to trigger an action on that rdd to checkpoint it. ``` scala>spark.sparkContext.setCheckpointDir(".") scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20))) df: org.apache.spark.sql.DataFrame = [_1: string, _2: int] scala>

Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-05 Thread Vadim Semenov
Are you sure that you use S3A? Because EMR says that they do not support S3A https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/ > Amazon EMR does not currently support use of the Apache Hadoop S3A file system. I think that the HEAD requests come from the

Re: How does HashPartitioner distribute data in Spark?

2017-06-23 Thread Vadim Semenov
This is the code that chooses the partition for a key: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88 it's basically `math.abs(key.hashCode % numberOfPartitions)` On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <

Re: "Sharing" dataframes...

2017-06-20 Thread Vadim Semenov
You can launch one permanent spark context and then execute your jobs within the context. And since they'll be running in the same context, they can share data easily. These two projects provide the functionality that you need:

Re: [How-To] Custom file format as source

2017-06-12 Thread Vadim Semenov
It should be easy to start with a custom Hadoop InputFormat that reads the file and creates a `RDD[Row]`, since you know the records size, it should be pretty easy to make the InputFormat to produce splits, so then you could read the file in parallel. On Mon, Jun 12, 2017 at 6:01 AM, OBones

Re: [Spark Core] Does spark support read from remote Hive server via JDBC

2017-06-08 Thread Vadim Semenov
Have you tried running a query? something like: ``` test.select("*").limit(10).show() ``` On Thu, Jun 8, 2017 at 4:16 AM, Даша Ковальчук wrote: > Hi guys, > > I need to execute hive queries on remote hive server from spark, but for > some reasons i receive only

Re: Bizarre UI Behavior after migration

2017-05-22 Thread Vadim Semenov
I believe it shows only the tasks that have actually being executed, if there were tasks with no data, they don't get reported. I might be mistaken, if somebody has a good explanation, would also like to hear. On Fri, May 19, 2017 at 5:45 PM, Miles Crawford wrote: > Hey

Re: Spark <--> S3 flakiness

2017-05-11 Thread Vadim Semenov
Use the official mailing list archive http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3ccajyeq0gh1fbhbajb9gghognhqouogydba28lnn262hfzzgf...@mail.gmail.com%3e On Thu, May 11, 2017 at 2:50 PM, lucas.g...@gmail.com wrote: > Also, and this is unrelated to the

Re: Looking at EMR Logs

2017-03-31 Thread Vadim Semenov
You can provide your own log directory, where Spark log will be saved, and that you could replay afterwards. Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and run it. Note! The path `s3://bucket/some/directory` must exist before you run your job, it'll not be created

Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Vadim Semenov
Check the source code for SparkLauncher: https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java#L541 a separate process will be started using `spark-submit` and if it uses `yarn-cluster` mode, a driver may be launched on another NodeManager

Re: NoClassDefFoundError

2016-12-21 Thread Vadim Semenov
You better ask folks in the spark-jobserver gitter channel: https://github.com/spark-jobserver/spark-jobserver On Wed, Dec 21, 2016 at 8:02 AM, Reza zade wrote: > Hello > > I've extended the JavaSparkJob (job-server-0.6.2) and created an object > of SQLContext class. my

  1   2   >