Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Silvio Fiorito
Why not just read from Spark as normal? Do these files have different or incompatible schemas? val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths) From: Eric Beabes Date: Tuesday, May 25, 2021 at 1:24 PM To: spark-user Subject: Reading parquet files in parallel on the cluster

Re: Poor performance caused by coalesce to 1

2021-02-03 Thread Silvio Fiorito
As I suggested, you need to use repartition(1) in place of coalesce(1) That will give you a single file output without losing parallelization for the rest of the job. From: James Yu Date: Wednesday, February 3, 2021 at 2:19 PM To: Silvio Fiorito , user Subject: Re: Poor performance caused

Re: Poor performance caused by coalesce to 1

2021-02-03 Thread Silvio Fiorito
Coalesce is reducing the parallelization of your last stage, in your case to 1 task. So, it’s natural it will give poor performance especially with large data. If you absolutely need a single file output, you can instead add a stage boundary and use repartition(1). This will give your query

Re: pandas_udf is very slow

2020-04-05 Thread Silvio Fiorito
Your 2 examples are doing different things. The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an aggregate. I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your result the same? From: Lian Jiang Date: Sunday, April 5, 2020 at 3:28 AM To: user

Re: Parquet 'bucketBy' creates a ton of files

2019-07-10 Thread Silvio Fiorito
-from-the-field-episode-ii-applying-best-practices-to-your-apache-spark-applications-with-silvio-fiorito From: Gourav Sengupta Date: Wednesday, July 10, 2019 at 3:14 AM To: Silvio Fiorito Cc: Arwin Tio , "user@spark.apache.org" Subject: Re: Parquet 'bucketBy' creates a ton of files

Re: Parquet 'bucketBy' creates a ton of files

2019-07-04 Thread Silvio Fiorito
You need to first repartition (at a minimum by bucketColumn1) since each task will write out the buckets/files. If the bucket keys are distributed randomly across the RDD partitions, then you will get multiple files per bucket. From: Arwin Tio Date: Thursday, July 4, 2019 at 3:22 AM To:

Re: [pyspark 2.3+] Bucketing with sort - incremental data load?

2019-05-31 Thread Silvio Fiorito
Spark does allow appending new files to bucketed tables. When the data is read in, Spark will combine the multiple files belonging to the same buckets into the same partitions. Having said that, you need to be very careful with bucketing especially as you’re appending to avoid generating lots

Re: [Spark SQL] Does Spark group small files

2018-11-13 Thread Silvio Fiorito
Yes, it does bin-packing for small files which is a good thing so you avoid having many small partitions especially if you’re writing this data back out (e.g. it’s compacting as you read). The default partition size is 128MB with a 4MB “cost” for opening files. You can configure this using the

Re: performance of IN clause

2018-10-17 Thread Silvio Fiorito
Have you run explain for each query? If you look at the physical query plan it’s most likely the same. If the inner-query/join-table is small enough it should end up as a broadcast join. From: Jayesh Lalwani Date: Wednesday, October 17, 2018 at 5:03 PM To: "user@spark.apache.org" Subject:

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-24 Thread Silvio Fiorito
-with-foreachbatch) Nothing’s free!  Since you’re just pushing all messages to kafka, might be easier on you to just explode the rows and let Spark do the rest for you. From: kant kodali Date: Tuesday, July 24, 2018 at 1:04 PM To: Silvio Fiorito Cc: Arun Mahadevan , chandan prakash , Tathagata Das

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-23 Thread Silvio Fiorito
Using the current Kafka sink that supports routing based on topic column, you could just duplicate the rows (e.g. explode rows with different topic, key values). That way you’re only reading and processing the source once and not having to resort to custom sinks, foreachWriter, or multiple

Re: Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-23 Thread Silvio Fiorito
Try this https://docs.microsoft.com/en-us/azure/sql-database/sql-database-spark-connector From: Chetan Khatri Date: Wednesday, May 23, 2018 at 7:47 AM To: user Subject: Bulk / Fast Read and Write with MSSQL Server and Spark All, I am

Re: Custom metrics sink

2018-03-16 Thread Silvio Fiorito
Just set your custom sink in the org.apache.spark.metrics.sink namespace and configure metrics.properties. Use ConsoleSink as an example. Obviously since it’s private the API may change, but in the meantime that should work…

Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Silvio Fiorito
Given you start with ~250MB but end up with 58GB seems like you’re generating quite a bit of data. Whether you use coalesce or repartition, still writing out 58GB with one core is going to take a while. Using Spark to do pre-processing but output a single file is not going to be very

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Silvio Fiorito
Couldn’t you readStream from Kafka, do your transformations, map your rows from the transformed input into what you want need to send to Kafka, then writeStream to Kafka? From: Liana Napalkova <liana.napalk...@eurecat.org> Date: Monday, December 18, 2017 at 10:07 AM To: Silvio F

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Silvio Fiorito
Why don’t you just use the Kafka sink for Spark 2.2? https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries From: Liana Napalkova Date: Monday, December 18, 2017 at 9:45 AM To:

Re: How to...UNION ALL of two SELECTs over different data sources in parallel?

2017-12-16 Thread Silvio Fiorito
Hi Jacek, Just replied to the SO thread as well, but… Yes, your first statement is correct. The DFs in the union are read in the same stage, so in your example where each DF has 8 partitions then you have a stage with 16 tasks to read the 2 DFs. There's no need to define the DF in a separate

Re: Generating StructType from dataframe.printSchema

2017-10-16 Thread Silvio Fiorito
If you’re confident the schema of all files is consistent, then just infer the schema from a single file and reuse it when loading the whole data set: val schema = spark.read.json(“/path/to/single/file.json”).schema val wholeDataSet = spark.read.schema(schema).json(“/path/to/whole/datasets”)

Re: best spark spatial lib?

2017-10-10 Thread Silvio Fiorito
There’s a number of packages for geospatial analysis, depends on the features you need. Here are a few I know of and/or have used: Magellan: https://github.com/harsha2010/magellan MrGeo: https://github.com/ngageoint/mrgeo GeoMesa: http://www.geomesa.org/documentation/tutorials/spark.html

Re: [StructuredStreaming] multiple queries of the socket source: only one query works.

2017-08-13 Thread Silvio Fiorito
Hi Gerard, Each query has its own distinct query plan and tracks offsets independently from other queries. Also, each query will generate a dynamic group id to ensure it gets all events and appears as a new consumer from Kafka’s perspective, that’s done internally to the Kafka source. That’s

Re: Does spark 2.1.0 structured streaming support jdbc sink?

2017-04-09 Thread Silvio Fiorito
JDBC sink is not in 2.1. You can see here for an example implementation using the ForEachWriter sink instead: https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html From: Hemanth Gudela

Re: TDD in Spark

2017-01-15 Thread Silvio Fiorito
You should check out Holden’s excellent spark-testing-base package: https://github.com/holdenk/spark-testing-base From: A Shaikh Date: Sunday, January 15, 2017 at 1:14 PM To: User Subject: TDD in Spark Whats the most popular Testing approach for

Re: How to connect Tableau to databricks spark?

2017-01-09 Thread Silvio Fiorito
Also, meant to add the link to the docs: https://docs.databricks.com/user-guide/faq/tableau.html From: Silvio Fiorito <silvio.fior...@granturing.com> Date: Monday, January 9, 2017 at 2:59 PM To: Raymond Xie <xie3208...@gmail.com>, user <user@spark.apache.org> Subject: Re: How

Re: How to connect Tableau to databricks spark?

2017-01-09 Thread Silvio Fiorito
Hi Raymond, Are you using a Spark 2.0 or 1.6 cluster? With Spark 2.0 it’s just a matter of entering the hostname of your Databricks environment, the HTTP path from the cluster page, and your Databricks credentials. Thanks, Silvio From: Raymond Xie Date: Sunday, January

Re: Cluster deploy mode driver location

2016-11-22 Thread Silvio Fiorito
Hi Saif! Unfortunately, I don't think this is possible for YARN driver-cluster mode. Regarding the JARs you're referring to, can you place them on HDFS so you can then have them in a central location and refer to them that way for dependencies?

Re: Joining to a large, pre-sorted file

2016-11-13 Thread Silvio Fiorito
erring to. Am I misunderstanding the explain plan? Thanks again! On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote: Hi Stuart, You don’t need the sortBy or sortWithinPartitions. https://databricks-prod-cloudfront

Re: Joining to a large, pre-sorted file

2016-11-12 Thread Silvio Fiorito
Hi Stuart, You don’t need the sortBy or sortWithinPartitions. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html This is what the job should look like:

Re: Joining to a large, pre-sorted file

2016-11-10 Thread Silvio Fiorito
You want to look at the bucketBy option when you save the master file out. That way it will be pre-partitioned by the join column, eliminating the shuffle on the larger file. From: Stuart White Date: Thursday, November 10, 2016 at 8:39 PM To: Jörn Franke

Re: Physical plan for windows and joins - how to know which is faster?

2016-11-09 Thread Silvio Fiorito
Hi Jacek, I haven't played with 2.1.0 yet, so not sure how much more optimized Window functions are compared to 1.6 and 2.0. However, one thing I do see in the self-join is a broadcast. So there's going to be a need broadcast the results of the groupBy out to the executors before it can do

Re: How do I convert a data frame to broadcast variable?

2016-11-03 Thread Silvio Fiorito
Hi Nishit, Yes the JDBC connector supports predicate pushdown and column pruning. So any selection you make on the dataframe will get materialized in the query sent via JDBC. You should be able to verify this by looking at the physical query plan: val df =

Re: DataFrame defined within conditional IF ELSE statement

2016-09-18 Thread Silvio Fiorito
Oh, sorry it was supposed to be sys.error, not sys.err From: Mich Talebzadeh <mich.talebza...@gmail.com> Date: Sunday, September 18, 2016 at 5:23 PM To: Silvio Fiorito <silvio.fior...@granturing.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: DataFram

Re: DataFrame defined within conditional IF ELSE statement

2016-09-18 Thread Silvio Fiorito
Hi Mich, That’s because df2 is only within scope in the if statements. Try this: val df = option match { case 1 => { println("option = 1") val df = spark.read.option("header", false).csv("hdfs://rhes564:9000/data/prices/prices.*") val df2 = df.map(p =>

Re: what contribute to Task Deserialization Time

2016-07-22 Thread Silvio Fiorito
Are you referencing member variables or other objects of your driver in your transformations? Those would have to be serialized and shipped to each executor when that job kicks off. On 7/22/16, 8:54 AM, "Jacek Laskowski" wrote: Hi, I can't specifically answer your question,

Re: Deploying ML Pipeline Model

2016-07-01 Thread Silvio Fiorito
Hi Rishabh, My colleague, Richard Garris from Databricks, actually just gave a talk last night at the Bay Area Spark Meetup on ML model deployment. The slides and recording should be up soon, you should be able to find a link here: http://www.meetup.com/spark-users/events/231574440/ Thanks,

Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Silvio Fiorito
Hi Saif! When you say this happens with spark-csv, are the files gzipped by any chance? GZip is non-splittable so if you’re seeing skew simply from loading data it could be you have some extremely large gzip files. So for a single stage job you will have those tasks lagging compared to the

Re: HiveContext standalone => without a Hive metastore

2016-05-26 Thread Silvio Fiorito
Hi Gerard, I’ve never had an issue using the HiveContext without a hive-site.xml configured. However, one issue you may have is if multiple users are starting the HiveContext from the same path, they’ll all be trying to store the default Derby metastore in the same location. Also, if you want

Re: Using HiveContext.set in multipul threads

2016-05-24 Thread Silvio Fiorito
If you’re using DataFrame API you can achieve that by simply using (or not) the “partitionBy” method on the DataFrameWriter: val originalDf = …. val df1 = originalDf…. val df2 = originalDf… df1.write.partitionBy(”col1”).save(…) df2.write.save(…) From: Amir Gershman Date:

Re: "collecting" DStream data

2016-05-15 Thread Silvio Fiorito
Hi Daniel, Given your example, “arr” is defined on the driver, but the “foreachRDD” function is run on the executors. If you want to collect the results of the RDD/DStream down to the driver you need to call RDD.collect. You have to be careful though that you have enough memory on the driver

RE: Apache Flink

2016-04-17 Thread Silvio Fiorito
Actually there were multiple responses to it on the GitHub project, including a PR to improve the Spark code, but they weren’t acknowledged. From: Ovidiu-Cristian MARCU Sent: Sunday, April 17, 2016 7:48 AM To: andy petrella

RE: Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-02 Thread Silvio Fiorito
In the meantime you can simply define your custom metric source in the org.apache.spark package. From: Walid Lezzar Sent: Saturday, April 2, 2016 4:23 AM To: Saisai Shao Cc: spark users Subject: Re: Spark

Re: Spark Metrics Framework?

2016-03-25 Thread Silvio Fiorito
system. I would suggest going ahead and submitting a JIRA request if there isn’t one already. Thanks, Silvio From: Mike Sukmanowsky <mike.sukmanow...@gmail.com<mailto:mike.sukmanow...@gmail.com>> Date: Friday, March 25, 2016 at 10:48 AM To: Silvio Fiorito <silvio.f

Re: Spark Metrics Framework?

2016-03-22 Thread Silvio Fiorito
6 at 3:13 PM To: Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>, "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Spark Metrics Framework? The Source cl

Re: Work out date column in CSV more than 6 months old (datediff or something)

2016-03-21 Thread Silvio Fiorito
There’s a months_between function you could use, as well: df.filter(months_between(current_date, $”Payment Date”) > 6).show From: Mich Talebzadeh > Date: Monday, March 21, 2016 at 5:53 PM To: "user @spark"

Re: Spark Metrics Framework?

2016-03-21 Thread Silvio Fiorito
You could use the metric sources and sinks described here: http://spark.apache.org/docs/latest/monitoring.html#metrics If you want to push the metrics to another system you can define a custom sink. You can also extend the metrics by defining a custom source. From: Mike Sukmanowsky

Re: Get the number of days dynamically in with Column

2016-03-20 Thread Silvio Fiorito
I’m not entirely sure if this is what you’re asking, but you could just use the datediff function: val df2 = df.withColumn("ID”, datediff($"end", $"start”)) If you want it formatted as {n}D then: val df2 = df.withColumn("ID", concat(datediff($"end", $"start"),lit("D"))) Thanks, Silvio From:

RE: Extra libs in executor classpath

2016-03-19 Thread Silvio Fiorito
Could you publish it as a library (to an internal repo) then you can simply use the “--packages" option? Also will help with versioning as you make changes, that way you’re not having to manually ship JARs around to your machines and users. From: Леонид Поляков

RE: Spark on YARN memory consumption

2016-03-11 Thread Silvio Fiorito
Hi Jan, Yes what you’re seeing is due to YARN container memory overhead. Also, typically the memory increments for YARN containers is 1GB. This gives a good overview: http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ Thanks, Silvio From: Jan

RE: Achieving 700 Spark SQL Queries Per Second

2016-03-10 Thread Silvio Fiorito
Very cool stuff Evan. Thanks for your work on this and sharing! From: Evan Chan Sent: Thursday, March 10, 2016 1:38 PM To: user@spark.apache.org Subject: Achieving 700 Spark SQL Queries Per Second Hey folks, I just saw a

RE: Using dynamic allocation and shuffle service in Standalone Mode

2016-03-08 Thread Silvio Fiorito
There’s a script to start it up under sbin, start-shuffle-service.sh. Run that on each of your worker nodes. From: Yuval Itzchakov<mailto:yuva...@gmail.com> Sent: Tuesday, March 8, 2016 2:17 PM To: Silvio Fiorito<mailto:silvio.fior...@granturing.com>; user@spark.apache.org

RE: Using dynamic allocation and shuffle service in Standalone Mode

2016-03-08 Thread Silvio Fiorito
You’ve started the external shuffle service on all worker nodes, correct? Can you confirm they’re still running and haven’t exited? From: Yuval.Itzchakov Sent: Tuesday, March 8, 2016 12:41 PM To: user@spark.apache.org Subject: Using

Re: Unit testing framework for Spark Jobs?

2016-03-02 Thread Silvio Fiorito
Please check out the following for some good resources: https://github.com/holdenk/spark-testing-base https://spark-summit.org/east-2016/events/beyond-collect-and-parallelize-for-tests/ On 3/2/16, 12:54 PM, "SRK" wrote: >Hi, > >What is a good unit testing

Re: Save DataFrame to Hive Table

2016-03-01 Thread Silvio Fiorito
Just do: val df = sqlContext.read.load(“/path/to/parquets/*”) If you do df.explain it’ll show the multiple input paths. From: "andres.fernan...@wellsfargo.com" > Date: Tuesday, March

Re: Union Parquet, DataFrame

2016-03-01 Thread Silvio Fiorito
Just replied to your other email, but here’s the same thing: Just do: val df = sqlContext.read.load(“/path/to/parquets/*”) If you do df.explain it’ll show the multiple input paths. From: "andres.fernan...@wellsfargo.com"

RE: Use maxmind geoip lib to process ip on Spark/Spark Streaming

2016-02-29 Thread Silvio Fiorito
I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but should still be good with 1.6. In this case I have a UDF to do the lookup, but for Streaming you’d just have a lambda to apply in a map function, so no UDF wrapper. import org.apache.spark.sql.functions._ import

Re: map operation clears custom partitioner

2016-02-22 Thread Silvio Fiorito
You can use mapValues to ensure partitioning is not lost. From: Brian London > Date: Monday, February 22, 2016 at 1:21 PM To: user > Subject: map operation clears custom partitioner It

RE: coalesce and executor memory

2016-02-14 Thread Silvio Fiorito
<mailto:christopher.br...@oracle.com> Sent: Friday, February 12, 2016 8:34 PM To: Koert Kuipers<mailto:ko...@tresata.com>; Silvio Fiorito<mailto:silvio.fior...@granturing.com> Cc: user<mailto:user@spark.apache.org> Subject: Re: coalesce and executor memory Thank you for the responses. The m

Re: Allowing parallelism in spark local mode

2016-02-12 Thread Silvio Fiorito
You’ll want to setup the FAIR scheduler as described here: https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application From: yael aharon > Date: Friday, February 12, 2016 at 2:00 PM To:

Re: coalesce and executor memory

2016-02-12 Thread Silvio Fiorito
Coalesce essentially reduces parallelism, so fewer cores are getting more records. Be aware that it could also lead to loss of data locality, depending on how far you reduce. Depending on what you’re doing in the map operation, it could lead to OOM errors. Can you give more details as to what

Re: Spark with .NET

2016-02-09 Thread Silvio Fiorito
That’s just a .NET assembly (not related to Spark DataSets) but doesn’t look like they’re actually using it. It’s typically a default reference pulled in by the project templates. The code though is available from Mono here:

Re: Unit test with sqlContext

2016-02-04 Thread Silvio Fiorito
Hi Steve, Have you looked at the spark-testing-base package by Holden? It’s really useful for unit testing Spark apps as it handles all the bootstrapping for you. https://github.com/holdenk/spark-testing-base DataFrame examples are here:

Re: General Question (Spark Hive integration )

2016-01-21 Thread Silvio Fiorito
Also, just to clarify it doesn’t read the whole table into memory unless you specifically cache it. From: Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> Date: Thursday, January 21, 2016 at 10:02 PM To: "Balaraju.Kagidala Kagidala&

Re: General Question (Spark Hive integration )

2016-01-21 Thread Silvio Fiorito
Hi Bala, It depends on how your Hive table is configured. If you used partitioning and you are filtering on a partition column then it will only load the relevant partitions. If, however, you’re filtering on a non-partitioned column then it will have to read all the data and then filter as

Re: visualize data from spark streaming

2016-01-20 Thread Silvio Fiorito
You’ve got a few options: * Use a notebook tool such as Zeppelin, Jupyter, or Spark Notebook to write up some visualizations which update in time with your streaming batches * Use Spark Streaming to push your batch results to another 3rd-party system with a BI tool that supports

Re: Spark Streaming: BatchDuration and Processing time

2016-01-17 Thread Silvio Fiorito
It will just queue up the subsequent batches, however if this delay is constant you may start losing batches. It can handle spikes in processing time, but if you know you're consistently running over your batch duration you either need to increase the duration or look at enabling back pressure

Re: why one of Stage is into Skipped section instead of Completed

2015-12-26 Thread Silvio Fiorito
Skipped stages result from existing shuffle output of a stage when re-running a transformation. The executors will have the output of the stage in their local dirs and Spark recognizes that, so rather than re-computing, it will start from the following stage. So, this is a good thing in that

Re: Spark data frame

2015-12-22 Thread Silvio Fiorito
Michael, collect will bring down the results to the driver JVM, whereas the RDD or DataFrame would be cached on the executors (if it is cached). So, as Dean said, the driver JVM needs to have enough memory to store the results of collect. Thanks, Silvio From: Michael Segel

Re: Testing with spark testing base

2015-12-05 Thread Silvio Fiorito
Yes, with IntelliJ you can set up a scalatest run configuration. You can also run directly from the sbt CLI by running “sbt test” From: Masf > Date: Saturday, December 5, 2015 at 12:51 PM To: "user@spark.apache.org"

RE: key not found: sportingpulse.com in Spark SQL 1.5.0

2015-10-30 Thread Silvio Fiorito
It's something due to the columnar compression. I've seen similar intermittent issues when caching DataFrames. "sportingpulse.com" is a value in one of the columns of the DF. From: Ted Yu Sent: ‎10/‎30/‎2015 6:33 PM To: Zhang,

Re: key not found: sportingpulse.com in Spark SQL 1.5.0

2015-10-30 Thread Silvio Fiorito
I don’t believe I have it on 1.5.1. Are you able to test the data locally to confirm, or is it too large? From: "Zhang, Jingyu" <jingyu.zh...@news.com.au<mailto:jingyu.zh...@news.com.au>> Date: Friday, October 30, 2015 at 7:31 PM To: Silvio Fiorito <silv

Re: Maintaining overall cumulative data in Spark Streaming

2015-10-30 Thread Silvio Fiorito
In the update function you can return None for a key and it will remove it. If you’re restarting your app you can delete your checkpoint directory to start from scratch, rather than continuing from the previous state. From: Sandeep Giri >

Re: how to merge two dataframes

2015-10-30 Thread Silvio Fiorito
Are you able to upgrade to Spark 1.5.1 and Cassandra connector to latest version? It no longer requires a separate CassandraSQLContext. From: Yana Kadiyska > Reply-To: "yana.kadiy...@gmail.com"

RE: Maintaining overall cumulative data in Spark Streaming

2015-10-29 Thread Silvio Fiorito
You could use updateStateByKey. There's a stateful word count example on Github. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala From: Sandeep

RE: Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)

2015-10-26 Thread Silvio Fiorito
Hi Matthias, Unless there was a change in 1.5, I'm afraid dynamic resource allocation is not yet supported in streaming apps. Thanks, Silvio Sent from my Lumia 930 From: Matthias Niehoff Sent: ‎10/‎26/‎2015 4:00 PM To:

RE: Concurrent execution of actions within a driver

2015-10-26 Thread Silvio Fiorito
There is a collectAsync action if you want to run them in parallel, but keep in mind the two jobs will need to share resources and you should use the FAIR scheduler. From: praveen S Sent: ‎10/‎26/‎2015 4:27 AM To:

Re: Java REST custom receiver

2015-10-01 Thread Silvio Fiorito
When you say “receive messages” you mean acting as a REST endpoint, right? If so, it might be better to use JMS (or Kafka) option for a few reasons: The receiver will be deployed to any of the available executors, so your REST clients will need to be made aware of the IP where the receiver is

Re: Receiver and Parallelization

2015-09-25 Thread Silvio Fiorito
One thing you should look at is your batch duration and spark.streaming.blockInterval Those 2 things control how many partitions are generated for each RDD (batch) of the DStream when using a receiver (vs direct approach). So if you have a 2 second batch duration and the default blockInterval

Re: DataFrame repartition not repartitioning

2015-09-16 Thread Silvio Fiorito
You just need to assign it to a new variable: val avroFile = sqlContext.read.format("com.databricks.spark.avro").load(inFile) val repart = avroFile.repartition(10) repart.save(outFile, "parquet") From: Steve Annessa Date: Wednesday, September 16, 2015 at 2:08 PM To:

Re: Where can I learn how to write udf?

2015-09-14 Thread Silvio Fiorito
Hi Saif, There are 2 types of UDFs. Those used by SQL and those used by the Scala DSL. For SQL, you just register a function like so (this example is from the docs): sqlContext.udf.register(“strLen”, (s: String) => s.length) sqlContext.sql(“select name, strLen(name) from people”).show The

Re: Realtime Data Visualization Tool for Spark

2015-09-11 Thread Silvio Fiorito
So if you want to build your own from the ground up, then yes you could go the d3js route. Like Feynman also responded you could use something like Spark Notebook or Zeppelin to create some charts as well. It really depends on your intended audience and ultimate goal. If you just want some

Re: Can Spark Provide Multiple Context Support?

2015-09-08 Thread Silvio Fiorito
Is the data from HDFS static or is it unique for each event in the stream? If it’s static, you can just create the SparkContext, load the files from HDFS, then start a StreamingContext with the existing SparkContext and go from there. From: Rachana Srivastava Date: Tuesday, September 8, 2015 at

Re: Unbale to run Group BY on Large File

2015-09-02 Thread Silvio Fiorito
Unfortunately, groupBy is not the most efficient operation. What is it you’re trying to do? It may be possible with one of the other *byKey transformations. From: "SAHA, DEBOBROTA" Date: Wednesday, September 2, 2015 at 7:46 PM To: "'user@spark.apache.org'" Subject:

Re: Help! Stuck using withColumn

2015-08-26 Thread Silvio Fiorito
Hi Saif, In both cases you’re referencing columns that don’t exist in the current DataFrame. The first email you did a select and then a withColumn for ‘month_date_cur' on the resulting DF, but that column does not exist, because you did a select for only ‘month_balance’. In the second email

Re: Left outer joining big data set with small lookups

2015-08-17 Thread Silvio Fiorito
executors due to lookup skew. Any more idea to tackle this issue in Spark Dataframe? Thanks Vijay On Aug 14, 2015, at 10:27 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: You could cache the lookup DataFrames, it’ll then do a broadcast join. On 8/14/15, 9:39 AM

Re: Left outer joining big data set with small lookups

2015-08-14 Thread Silvio Fiorito
You could cache the lookup DataFrames, it’ll then do a broadcast join. On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Hi I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema

Re: Spark DataFrames uses too many partition

2015-08-11 Thread Silvio Fiorito
You need to configure the spark.sql.shuffle.partitions parameter to a different value. It defaults to 200. On 8/11/15, 11:31 AM, Al M alasdair.mcbr...@gmail.com wrote: I am using DataFrames with Spark 1.4.1. I really like DataFrames but the partitioning makes no sense to me. I am loading

RE: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Silvio Fiorito
Can you share the transformations up to the foreachPartition? From: Sujit Palmailto:sujitatgt...@gmail.com Sent: ‎8/‎2/‎2015 4:42 PM To: Igor Bermanmailto:igor.ber...@gmail.com Cc: usermailto:user@spark.apache.org Subject: Re: How to increase parallelism of a Spark

Re: Local Repartition

2015-07-20 Thread Silvio Fiorito
Hi Daniel, Coalesce, by default will not cause a shuffle. The second parameter when set to true will cause a full shuffle. This is actually what repartition does (calls coalesce with shuffle=true). It will attempt to keep colocated partitions together (as you describe) on the same executor.

Re: Sessionization using updateStateByKey

2015-07-15 Thread Silvio Fiorito
Hi Cody, I’ve had success using updateStateByKey for real-time sessionization by aging off timed-out sessions (returning None in the update function). This was on a large commercial website with millions of hits per day. This was over a year ago so I don’t have access to the stats any longer

Re: .NET on Apache Spark?

2015-07-05 Thread Silvio Fiorito
Joe Duffy, director of engineering on Microsoft's compiler team made a comment about investigating F# type providers for Spark. https://twitter.com/xjoeduffyx/status/614076012372955136 From: Ashic Mahtabmailto:as...@live.com Sent: ?Sunday?, ?July? ?5?, ?2015 ?1?:?29? ?PM To: Ruslan

Re: Spark performance issue

2015-07-03 Thread Silvio Fiorito
It’ll help to see the code or at least understand what transformations you’re using. Also, you have 15 nodes but not using all of them, so that means you may be losing data locality. You can see this in the job UI for Spark if any jobs do not have node or process local. From: diplomatic Guru

Re: Spark Streaming broadcast to all keys

2015-07-03 Thread Silvio Fiorito
updateStateByKey will run for all keys, whether they have new data in a batch or not so you should be able to still use it. On 7/3/15, 7:34 AM, micvog mich...@micvog.com wrote: UpdateStateByKey is useful but what if I want to perform an operation to all existing keys (not only the ones in

RE: .NET on Apache Spark?

2015-07-02 Thread Silvio Fiorito
Since Spark runs on the JVM, no there isn't support for .Net. You should take a look at Dryad and Naiad instead. https://github.com/MicrosoftResearch/ From: Zwitsmailto:daniel.van...@ortec-finance.com Sent: ‎7/‎2/‎2015 4:33 AM To:

Re: map vs foreach for sending data to external system

2015-07-02 Thread Silvio Fiorito
foreach absolutely runs on the executors. For sending data to an external system you should likely use foreachPartition in order to batch the output. Also if you want to limit the parallelism of the output action then you can use coalesce. What makes you think foreach is running on the driver?

Re: custom RDD in java

2015-07-01 Thread Silvio Fiorito
If all you’re doing is just dumping tables from SQLServer to HDFS, have you looked at Sqoop? Otherwise, if you need to run this in Spark could you just use the existing JdbcRDD? From: Shushant Arora Date: Wednesday, July 1, 2015 at 10:19 AM To: user Subject: custom RDD in java Hi Is it

Re: custom RDD in java

2015-07-01 Thread Silvio Fiorito
Sure, you can create custom RDDs. Haven’t done so in Java, but in Scala absolutely. From: Shushant Arora Date: Wednesday, July 1, 2015 at 1:44 PM To: Silvio Fiorito Cc: user Subject: Re: custom RDD in java ok..will evaluate these options but is it possible to create RDD in java? On Wed, Jul 1

Re: Shuffle files lifecycle

2015-06-29 Thread Silvio Fiorito
Regarding 1 and 2, yes shuffle output is stored on the worker local disks and will be reused across jobs as long as they’re available. You can identify when they’re used by seeing skipped stages in the job UI. They are periodically cleaned up based on available space of the configured

Re:

2015-06-26 Thread Silvio Fiorito
]], classOf[AvroKey[GenericRecord]], classOf[NullWritable], hadoopConf).map(_._1.datum.get(name)) println(input.partitions.size) From: ÐΞ€ρ@Ҝ (๏̯͡๏) Date: Friday, June 26, 2015 at 11:04 AM To: Silvio Fiorito Cc: user Subject: Re: dependency groupIdorg.apache.avro/groupId artifactIdavro

Re:

2015-06-26 Thread Silvio Fiorito
)). reduceByKey(_ + _). coalesce(1). sortBy(_._2, false). take(10). foreach(println) From: ÐΞ€ρ@Ҝ (๏̯͡๏) Date: Friday, June 26, 2015 at 10:18 AM To: Silvio Fiorito Cc: user Subject: Re: All these throw compilation error at newAPIHadoopFile 1) val hadoopConfiguration = new Configuration

Re:

2015-06-26 Thread Silvio Fiorito
No worries, glad to help! It also helped me as I had not worked directly with the Hadoop APIs for controlling splits. From: ÐΞ€ρ@Ҝ (๏̯͡๏) Date: Friday, June 26, 2015 at 1:31 PM To: Silvio Fiorito Cc: user Subject: Re: Silvio, Thanks for your responses and patience. It worked after i reshuffled

Re:

2015-06-25 Thread Silvio Fiorito
Hi Deepak, Have you tried specifying the minimum partitions when you load the file? I haven’t tried that myself against HDFS before, so I’m not sure if it will affect data locality. Ideally not, it should still maintain data locality but just more partitions. Once your job runs, you can check

  1   2   >