Re: Apache Arrow support for Apache Spark

2020-02-17 Thread Chris Teoh
1. I'd also consider how you're structuring the data before applying the
join, naively doing the join could be expensive so doing a bit of data
preparation may be necessary to improve join performance. Try to get a
baseline as well. Arrow would help improve it.

2. Try storing it back as Parquet but in a way the next application can
take advantage of predicate pushdown.



On Mon, 17 Feb 2020, 6:41 pm Subash Prabakar, 
wrote:

> Hi Team,
>
> I have two questions regarding Arrow and Spark integration,
>
> 1. I am joining two huge tables (1PB) each - will the performance be huge
> when I use Arrow format before shuffling ? Will the
> serialization/deserialization cost have significant improvement?
>
> 2. Can we store the final data in Arrow format to HDFS and read them back
> in another Spark application? If so how could I do that ?
> Note: The dataset is transient  - separation of responsibility is for
> easier management. Though resiliency inside spark - we use different
> language (in our case Java and Python)
>
> Thanks,
> Subash
>
>


Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
The most common delivery semantic for Kafka producer is at least once.

So your consumers have to handle dedupe.

Spark can do checkpoint but you have to be explicit about it. It only makes
sense if your dataframe lineage gets too long (only if you're doing a
highly iterative algorithm) and you need to trim it to avoid having to
recompute from the start upon failure. It does not keep track of your Kafka
offsets for you.

In the context of reading from Kafka, your consumers can explicitly commit
an offset so kafka knows you've read up to that point.


On Tue, 4 Feb 2020, 6:13 am Ruijing Li,  wrote:

> Hi Chris,
>
> Thanks for the answer. So if I understand correctly:
>
> - there will be need to dedupe since I should be expecting at least once
> delivery.
>
> - storing the result of (group by partition and and aggregate max offsets)
> is enough since kafka message is immutable, so a message will get sent with
> a different offset instead of the same offset.
>
> So spark when reading from kafka is acting as a least once consumer? Why
> does spark not do checkpointing for batch read of kafka?
>
> On Mon, Feb 3, 2020 at 1:36 AM Chris Teoh  wrote:
>
>> Kafka can keep track of the offsets (in a separate topic based on your
>> consumer group) you've seen but it is usually best effort and you're
>> probably better off also keeping track of your offsets.
>>
>> If the producer resends a message you would have to dedupe it as you've
>> most likely already seen it, how you handle that is dependent on your data.
>> I think the offset will increment automatically, you will generally not see
>> the same offset occur more than once in a Kafka topic partition, feel free
>> to correct me on this though. So the most likely scenario you need to
>> handle is if the producer sends a duplicate message with two offsets.
>>
>> The alternative is you can reprocess the offsets back from where you
>> thought the message was last seen.
>>
>> Kind regards
>> Chris
>>
>> On Mon, 3 Feb 2020, 7:39 pm Ruijing Li,  wrote:
>>
>>> Hi all,
>>>
>>> My use case is to read from single kafka topic using a batch spark sql
>>> job (not structured streaming ideally). I want this batch job every time it
>>> starts to get the last offset it stopped at, and start reading from there
>>> until it caught up to the latest offset, store the result and stop the job.
>>> Given the dataframe has a partition and offset column, my first thought for
>>> offset management is to groupBy partition and agg the max offset, then
>>> store it in HDFS. Next time the job runs, it will read and start from this
>>> max offset using startingOffsets
>>>
>>> However, I was wondering if this will work. If the kafka producer failed
>>> an offset and later decides to resend it, I will have skipped it since I’m
>>> starting from the max offset sent. How does spark structured streaming know
>>> to continue onwards - does it keep a state of all offsets seen? If so, how
>>> can I replicate this for batch without missing data? Any help would be
>>> appreciated.
>>>
>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Best way to read batch from Kafka and Offsets

2020-02-03 Thread Chris Teoh
Kafka can keep track of the offsets (in a separate topic based on your
consumer group) you've seen but it is usually best effort and you're
probably better off also keeping track of your offsets.

If the producer resends a message you would have to dedupe it as you've
most likely already seen it, how you handle that is dependent on your data.
I think the offset will increment automatically, you will generally not see
the same offset occur more than once in a Kafka topic partition, feel free
to correct me on this though. So the most likely scenario you need to
handle is if the producer sends a duplicate message with two offsets.

The alternative is you can reprocess the offsets back from where you
thought the message was last seen.

Kind regards
Chris

On Mon, 3 Feb 2020, 7:39 pm Ruijing Li,  wrote:

> Hi all,
>
> My use case is to read from single kafka topic using a batch spark sql job
> (not structured streaming ideally). I want this batch job every time it
> starts to get the last offset it stopped at, and start reading from there
> until it caught up to the latest offset, store the result and stop the job.
> Given the dataframe has a partition and offset column, my first thought for
> offset management is to groupBy partition and agg the max offset, then
> store it in HDFS. Next time the job runs, it will read and start from this
> max offset using startingOffsets
>
> However, I was wondering if this will work. If the kafka producer failed
> an offset and later decides to resend it, I will have skipped it since I’m
> starting from the max offset sent. How does spark structured streaming know
> to continue onwards - does it keep a state of all offsets seen? If so, how
> can I replicate this for batch without missing data? Any help would be
> appreciated.
>
>
> --
> Cheers,
> Ruijing Li
>


Re: Submitting job with external dependencies to pyspark

2020-01-28 Thread Chris Teoh
Usually this isn't done as the data is meant to be on a shared/distributed
storage, eg HDFS, S3, etc.

Spark should then read this data into a dataframe and your code logic
applies to the dataframe in a distributed manner.

On Wed, 29 Jan 2020 at 09:37, Tharindu Mathew 
wrote:

> That was really helpful. Thanks! I actually solved my problem using by
> creating a venv and using the venv flags. Wondering now how to submit the
> data as an archive? Any idea?
>
> On Mon, Jan 27, 2020, 9:25 PM Chris Teoh  wrote:
>
>> Use --py-files
>>
>> See
>> https://spark.apache.org/docs/latest/submitting-applications.html#bundling-your-applications-dependencies
>>
>> I hope that helps.
>>
>> On Tue, 28 Jan 2020, 9:46 am Tharindu Mathew, 
>> wrote:
>>
>>> Hi,
>>>
>>> Newbie to pyspark/spark here.
>>>
>>> I'm trying to submit a job to pyspark with a dependency. Spark DL in
>>> this case. While the local environment has this the pyspark does not see
>>> it. How do I correctly start pyspark so that it sees this dependency?
>>>
>>> Using Spark 2.3.0 in a cloudera setup.
>>>
>>> --
>>> Regards,
>>> Tharindu Mathew
>>> http://tharindumathew.com
>>>
>>

-- 
Chris


Re: Submitting job with external dependencies to pyspark

2020-01-27 Thread Chris Teoh
Use --py-files

See
https://spark.apache.org/docs/latest/submitting-applications.html#bundling-your-applications-dependencies

I hope that helps.

On Tue, 28 Jan 2020, 9:46 am Tharindu Mathew, 
wrote:

> Hi,
>
> Newbie to pyspark/spark here.
>
> I'm trying to submit a job to pyspark with a dependency. Spark DL in this
> case. While the local environment has this the pyspark does not see it. How
> do I correctly start pyspark so that it sees this dependency?
>
> Using Spark 2.3.0 in a cloudera setup.
>
> --
> Regards,
> Tharindu Mathew
> http://tharindumathew.com
>


Re: RESTful Operations

2020-01-19 Thread Chris Teoh
Maybe something like Livy, otherwise roll your own REST API and have it
start a Spark job.

On Mon, 20 Jan 2020 at 06:55,  wrote:

> I am new to Spark. The task I want to accomplish is let client send http
> requests, then spark process that request for further operations. However
> searching Spark's website docs
>
>
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.package
> 
>
> https://spark.apache.org/docs/latest/
>
> I do not find any places mentioning about this. Also most of the internet
> result are more lated to spark job server.
>
> Any places I should start if I want to use Spark for such purpose?
>
> Thanks
>


-- 
Chris


Re: Does explode lead to more usage of memory

2020-01-19 Thread Chris Teoh
Depends on the use case, if you have to join, you're saving a join and a
shuffle from having it already in an array.

If you explode, at least sort within partitions to get you predicate
pushdown when you read the data next time.

On Sun, 19 Jan 2020, 1:19 pm Jörn Franke,  wrote:

> Why not two tables and then you can join them? This would be the standard
> way. it depends what your full use case is, what volumes / orders you
> expect on average, how aggregations and filters look like. The example
> below states that you do a Select all on the table.
>
> > Am 19.01.2020 um 01:50 schrieb V0lleyBallJunki3 :
> >
> > I am using a dataframe and has structure like this :
> >
> > root
> > |-- orders: array (nullable = true)
> > ||-- element: struct (containsNull = true)
> > |||-- amount: double (nullable = true)
> > |||-- id: string (nullable = true)
> > |-- user: string (nullable = true)
> > |-- language: string (nullable = true)
> >
> > Each user has multiple orders. Now if I explode orders like this:
> >
> > df.select($"user", explode($"orders").as("order")) . Each order element
> will
> > become a row with a duplicated user and language. Was wondering if spark
> > actually converts each order element into a single row in memory or it
> just
> > logical. Because if a single user has 1000 orders  then wouldn't it lead
> to
> > a lot more memory consumption since it is duplicating user and language a
> > 1000 times (once for each order) in memory?
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Does explode lead to more usage of memory

2020-01-18 Thread Chris Teoh
I think it does mean more memory usage but consider how big your arrays
are. Think about your use case requirements and whether it makes sense to
use arrays. Also it may be preferable to explode if the arrays are very
large. I'd say exploding arrays will make the data more splittable, having
the array has benefit of avoiding a join and colocation of the children
items but does imply more memory pressure on each executor to read every
record in the array, requiring denser nodes.

I hope that helps.

On Sun, 19 Jan 2020, 7:50 am V0lleyBallJunki3, 
wrote:

> I am using a dataframe and has structure like this :
>
> root
>  |-- orders: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- amount: double (nullable = true)
>  |||-- id: string (nullable = true)
>  |-- user: string (nullable = true)
>  |-- language: string (nullable = true)
>
> Each user has multiple orders. Now if I explode orders like this:
>
> df.select($"user", explode($"orders").as("order")) . Each order element
> will
> become a row with a duplicated user and language. Was wondering if spark
> actually converts each order element into a single row in memory or it just
> logical. Because if a single user has 1000 orders  then wouldn't it lead to
> a lot more memory consumption since it is duplicating user and language a
> 1000 times (once for each order) in memory?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Executor OOMs when writing Parquet

2020-01-17 Thread Chris Teoh
Yes. Disk spill can be a huge performance hit, with smaller partitions you
may avoid this and possibly complete your job faster. I hope you don't get
OOM.

On Sat, 18 Jan 2020 at 10:06, Arwin Tio  wrote:

> Okay! I didn't realize you can pump those partition numbers up that high.
> 15000 partitions still failed. I am trying 3 partitions now. There is
> still some disk spill but it is not that high.
>
> Thanks,
>
> Arwin
>
> ------
> *From:* Chris Teoh 
> *Sent:* January 17, 2020 7:32 PM
> *To:* Arwin Tio 
> *Cc:* user @spark 
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> You also have disk spill which is a performance hit.
>
> Try multiplying the number of partitions by about 20x - 40x and see if you
> can eliminate shuffle spill.
>
> On Fri, 17 Jan 2020, 10:37 pm Arwin Tio,  wrote:
>
> Yes, mostly memory spills though (36.9 TiB memory, 895 GiB disk). I was
> under the impression that memory spill is OK?
>
>
> (If you're wondering, this is EMR).
>
> --
> *From:* Chris Teoh 
> *Sent:* January 17, 2020 10:30 AM
> *To:* Arwin Tio 
> *Cc:* user @spark 
> *Subject:* Re: Spark Executor OOMs when writing Parquet
>
> Sounds like you don't have enough partitions. Try and repartition to 14496
> partitions. Are your stages experiencing shuffle spill?
>
> On Fri, 17 Jan 2020, 10:12 pm Arwin Tio,  wrote:
>
> Hello,
>
> I have a fairly straightforward Spark job that converts CSV to Parquet:
>
> ```
> Dataset df = spark.read(...)
>
> df
>   .repartition(5000)
>   .write()
>   .format("parquet")
>   .parquet("s3://mypath/...);
> ```
>
> For context, there are about 5 billion rows, each with 2000 columns. The
> entire dataset is about 1 TB (compressed).
>
> The error looks like this:
>
> ```
>   20/01/16 13:08:55 WARN TaskSetManager: Lost task 265.0 in stage 2.0 (TID
> 24300, ip-172-24-107-37.us-west-2.compute.internal, executor 13):
> org.apache.spark.SparkException: Task failed while writing rows.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.OutOfMemoryError
> at sun.misc.Unsafe.allocateMemory(Native Method)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:127)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at
> org.apache.parquet.hadoop.codec.SnappyCompressor.setInput(SnappyCompressor.java:97)
> at
> org.apache.parquet.hadoop.codec.NonBlockedCompressorStream.write(NonBlockedCompressorStream.java:48)
> at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeToOutput(CapacityByteArrayOutputStream.java:227)
> at
> org.apache.parquet.bytes.CapacityByteArrayOutputStream.writeTo(CapacityByteArrayOutputStream.java:247)
> at
> org.apache.parquet.bytes.BytesInput$CapacityBAOSBytesInput.writeAllTo(BytesInput.java:405)
> at
> org.apache.parquet.bytes.BytesInput$SequenceBytesIn.writeAllTo(BytesInput.java:296)
> at
> org.apache.parquet.hadoop.CodecFactory$HeapBytesCompressor.compress(CodecFactory.java:164)
> at
> org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:95)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:147)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.accountForValueWritten(ColumnWriterV1.java:106)
> at
> org.apache.parquet.column.impl.ColumnWriterV1.writeNull(ColumnWriterV1.java:170)
> at
> org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.writeNull(MessageColumnIO.java:347)
> at
> 

Re: Out of memory HDFS Read and Write

2019-12-22 Thread Chris Teoh
Does it work for just a single path input and single output?

Is the destPath a collection that is sitting on the driver?

On Sun, 22 Dec 2019, 7:59 pm Ruijing Li,  wrote:

> I was experimenting and found something interesting. I have executor OOM
> even if I don’t write to remote clusters. So it is purely a dataframe read
> and write issue
> —
> To recap, I have an ETL data pipeline that does some logic, repartitions
> to reduce the amount of files written, writes the output to HDFS as parquet
> files. After, it reads the output and writes it to other locations, doesn’t
> matter if on the same hadoop cluster or multiple. This is a simple piece of
> code
> ```
> destPaths.foreach(path =>
> Try(spark.read.parquet(sourceOutputPath).write.mode(SaveMode.Overwrite).parquet(path))
> match {
> //log failure or success
> }
> ```
> However this stage - read from sourceOutput and write to different
> locations - is failing in Spark, despite all other stages succeeding,
> including the heavy duty logic. And the data is not too big to handle for
> spark.
>
> Only bumping memoryOverhead, and also repartitioning output to more
> partitions, 40 precisely (when it failed, we partitioned the output to 20
> after logic is finished but before writing to HDFS) have made the
> read stage succeed.
>
> Not understanding how spark read stage can experience OOM issues.
> Hoping to shed some light on why.
>
> On Sat, Dec 21, 2019 at 7:57 PM Chris Teoh  wrote:
>
>> I'm not entirely sure what the behaviour is when writing to remote
>> cluster. It could be that the connections are being established for every
>> element in your dataframe, perhaps having to use for each partition may
>> reduce the number of connections? You may have to look at what the
>> executors do when they reach out to the remote cluster.
>>
>> On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:
>>
>>> I managed to make the failing stage work by increasing memoryOverhead to
>>> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
>>> spark.mesos.executor.memoryOverhead=8G
>>>
>>> *Can someone explain why this solved the issue?* As I understand, usage
>>> of memoryOverhead is for VM overhead and non heap items, which a simple
>>> read and write should not use (albeit to different hadoop clusters, but
>>> network should be nonissue since they are from the same machines).
>>>
>>> We use spark defaults for everything else.
>>>
>>> We are calling df.repartition(20) in our write after logic is done
>>> (before failing stage of multiple cluster write) to prevent spark’s small
>>> files problem. We reduce from 4000 partitions to 20.
>>>
>>> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li 
>>> wrote:
>>>
>>>> Not for the stage that fails, all it does is read and write - the
>>>> number of tasks is # of cores * # of executor instances. For us that is 60
>>>> (3 cores 20 executors)
>>>>
>>>> The input partition size for the failing stage, when spark reads the 20
>>>> files each 132M, it comes out to be 40 partitions.
>>>>
>>>>
>>>>
>>>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh 
>>>> wrote:
>>>>
>>>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>>>> if the number of your input partitions to the write is larger than that
>>>>> configuration.
>>>>>
>>>>> Is there anything in the executor logs or the Spark UI DAG that
>>>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>>>> What's the input partition size?
>>>>>
>>>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li, 
>>>>> wrote:
>>>>>
>>>>>> Could you explain why shuffle partitions might be a good starting
>>>>>> point?
>>>>>>
>>>>>> Some more details: when I write the output the first time after logic
>>>>>> is complete, I repartition the files to 20 after having
>>>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small 
>>>>>> files.
>>>>>> Data is small about 130MB per file. When spark reads it reads in 40
>>>>>> partitions and tries to output that to the different cluster. 
>>>>>> Unfortunately
>>>>>> during that read and write stage executors drop off.
>>>>>>
>>>>>> We kee

Re: Out of memory HDFS Multiple Cluster Write

2019-12-21 Thread Chris Teoh
I'm not entirely sure what the behaviour is when writing to remote cluster.
It could be that the connections are being established for every element in
your dataframe, perhaps having to use for each partition may reduce the
number of connections? You may have to look at what the executors do when
they reach out to the remote cluster.

On Sun, 22 Dec 2019, 8:07 am Ruijing Li,  wrote:

> I managed to make the failing stage work by increasing memoryOverhead to
> something ridiculous > 50%. Our spark.executor.memory  = 12gb and I bumped
> spark.mesos.executor.memoryOverhead=8G
>
> *Can someone explain why this solved the issue?* As I understand, usage
> of memoryOverhead is for VM overhead and non heap items, which a simple
> read and write should not use (albeit to different hadoop clusters, but
> network should be nonissue since they are from the same machines).
>
> We use spark defaults for everything else.
>
> We are calling df.repartition(20) in our write after logic is done (before
> failing stage of multiple cluster write) to prevent spark’s small files
> problem. We reduce from 4000 partitions to 20.
>
> On Sat, Dec 21, 2019 at 11:28 AM Ruijing Li  wrote:
>
>> Not for the stage that fails, all it does is read and write - the number
>> of tasks is # of cores * # of executor instances. For us that is 60 (3
>> cores 20 executors)
>>
>> The input partition size for the failing stage, when spark reads the 20
>> files each 132M, it comes out to be 40 partitions.
>>
>>
>>
>> On Fri, Dec 20, 2019 at 4:40 PM Chris Teoh  wrote:
>>
>>> If you're using Spark SQL, that configuration setting causes a shuffle
>>> if the number of your input partitions to the write is larger than that
>>> configuration.
>>>
>>> Is there anything in the executor logs or the Spark UI DAG that
>>> indicates a shuffle? I don't expect a shuffle if it is a straight write.
>>> What's the input partition size?
>>>
>>> On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:
>>>
>>>> Could you explain why shuffle partitions might be a good starting point?
>>>>
>>>> Some more details: when I write the output the first time after logic
>>>> is complete, I repartition the files to 20 after having
>>>> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
>>>> Data is small about 130MB per file. When spark reads it reads in 40
>>>> partitions and tries to output that to the different cluster. Unfortunately
>>>> during that read and write stage executors drop off.
>>>>
>>>> We keep hdfs block 128Mb
>>>>
>>>> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh 
>>>> wrote:
>>>>
>>>>> spark.sql.shuffle.partitions might be a start.
>>>>>
>>>>> Is there a difference in the number of partitions when the parquet is
>>>>> read to spark.sql.shuffle.partitions? Is it much higher than
>>>>> spark.sql.shuffle.partitions?
>>>>>
>>>>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li, 
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have encountered a strange executor OOM error. I have a data
>>>>>> pipeline using Spark 2.3 Scala 2.11.12. This pipeline writes the output 
>>>>>> to
>>>>>> one HDFS location as parquet then reads the files back in and writes to
>>>>>> multiple hadoop clusters (all co-located in the same datacenter).  It
>>>>>> should be a very simple task, but executors are being killed off 
>>>>>> exceeding
>>>>>> container thresholds. From logs, it is exceeding given memory (using 
>>>>>> Mesos
>>>>>> as the cluster manager).
>>>>>>
>>>>>> The ETL process works perfectly fine with the given resources, doing
>>>>>> joins and adding columns. The output is written successfully the first
>>>>>> time. *Only when the pipeline at the end reads the output from HDFS
>>>>>> and writes it to different HDFS cluster paths does it fail.* (It
>>>>>> does a spark.read.parquet(source).write.parquet(dest))
>>>>>>
>>>>>> This doesn't really make sense and I'm wondering what configurations
>>>>>> I should start looking at.
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>> --
>>>>>> Cheers,
>>>>>> Ruijing Li
>>>>>>
>>>>> --
>>>> Cheers,
>>>> Ruijing Li
>>>>
>>> --
>> Cheers,
>> Ruijing Li
>>
> --
> Cheers,
> Ruijing Li
> --
> Cheers,
> Ruijing Li
>


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
If you're using Spark SQL, that configuration setting causes a shuffle if
the number of your input partitions to the write is larger than that
configuration.

Is there anything in the executor logs or the Spark UI DAG that indicates a
shuffle? I don't expect a shuffle if it is a straight write. What's the
input partition size?

On Sat, 21 Dec 2019, 10:24 am Ruijing Li,  wrote:

> Could you explain why shuffle partitions might be a good starting point?
>
> Some more details: when I write the output the first time after logic is
> complete, I repartition the files to 20 after having
> spark.sql.shuffle.partitions = 2000 so we don’t have too many small files.
> Data is small about 130MB per file. When spark reads it reads in 40
> partitions and tries to output that to the different cluster. Unfortunately
> during that read and write stage executors drop off.
>
> We keep hdfs block 128Mb
>
> On Fri, Dec 20, 2019 at 3:01 PM Chris Teoh  wrote:
>
>> spark.sql.shuffle.partitions might be a start.
>>
>> Is there a difference in the number of partitions when the parquet is
>> read to spark.sql.shuffle.partitions? Is it much higher than
>> spark.sql.shuffle.partitions?
>>
>> On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:
>>
>>> Hi all,
>>>
>>> I have encountered a strange executor OOM error. I have a data pipeline
>>> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
>>> location as parquet then reads the files back in and writes to multiple
>>> hadoop clusters (all co-located in the same datacenter).  It should be a
>>> very simple task, but executors are being killed off exceeding container
>>> thresholds. From logs, it is exceeding given memory (using Mesos as the
>>> cluster manager).
>>>
>>> The ETL process works perfectly fine with the given resources, doing
>>> joins and adding columns. The output is written successfully the first
>>> time. *Only when the pipeline at the end reads the output from HDFS and
>>> writes it to different HDFS cluster paths does it fail.* (It does a
>>> spark.read.parquet(source).write.parquet(dest))
>>>
>>> This doesn't really make sense and I'm wondering what configurations I
>>> should start looking at.
>>>
>>> --
>>> Cheers,
>>> Ruijing Li
>>> --
>>> Cheers,
>>> Ruijing Li
>>>
>> --
> Cheers,
> Ruijing Li
>


Re: Out of memory HDFS Multiple Cluster Write

2019-12-20 Thread Chris Teoh
spark.sql.shuffle.partitions might be a start.

Is there a difference in the number of partitions when the parquet is read
to spark.sql.shuffle.partitions? Is it much higher than
spark.sql.shuffle.partitions?

On Fri, 20 Dec 2019, 7:34 pm Ruijing Li,  wrote:

> Hi all,
>
> I have encountered a strange executor OOM error. I have a data pipeline
> using Spark 2.3 Scala 2.11.12. This pipeline writes the output to one HDFS
> location as parquet then reads the files back in and writes to multiple
> hadoop clusters (all co-located in the same datacenter).  It should be a
> very simple task, but executors are being killed off exceeding container
> thresholds. From logs, it is exceeding given memory (using Mesos as the
> cluster manager).
>
> The ETL process works perfectly fine with the given resources, doing joins
> and adding columns. The output is written successfully the first time. *Only
> when the pipeline at the end reads the output from HDFS and writes it to
> different HDFS cluster paths does it fail.* (It does a
> spark.read.parquet(source).write.parquet(dest))
>
> This doesn't really make sense and I'm wondering what configurations I
> should start looking at.
>
> --
> Cheers,
> Ruijing Li
> --
> Cheers,
> Ruijing Li
>


Re: Identify bottleneck

2019-12-19 Thread Chris Teoh
As far as I'm aware it isn't any better. The logic all gets processed by
the same engine so to confirm, compare the DAGs generated from both
approaches and see if they're identical.

On Fri, 20 Dec 2019, 8:56 am ayan guha,  wrote:

> Quick question: Why is it better to use one sql vs multiple withColumn?
> isnt everything eventually rewritten by catalyst?
>
> On Wed, 18 Dec 2019 at 9:14 pm, Enrico Minack 
> wrote:
>
>> How many withColumn statements do you have? Note that it is better to use
>> a single select, rather than lots of withColumn. This also makes drops
>> redundant.
>>
>> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
>> really slow. Can you try this on a single machine, i.e. run wit "local[*]".
>>
>> Can you rule out the writing part by counting the rows? I presume this
>> all happens in a single stage.
>>
>> Enrico
>>
>>
>> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
>>
>> Hello
>>
>> I'm working on an ETL based on csv describing file systems to transform
>> it into parquet so I can work on them easily to extract informations.
>> I'm using Mr. Powers framework Daria to do so. I've quiet different input
>> and a lot of transformation and the framework helps organize the code.
>> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and
>> 32GB of memory each.
>> The storage is handle by a CephFS volume mounted on all nodes.
>> First a small description of my algorithm (it's quiet simple):
>>
>> Use SparkContext to load the csv.bz2 file,
>> Chain a lot of withColumn() statement,
>> Drop all unnecessary columns,
>> Write parquet file to CephFS
>>
>> This treatment can take several hours depending on how much lines the CSV
>> is and I wanted to identify if bz2 or network could be an issue
>> so I run the following test (several time with consistent result) :
>> I tried the following scenario with 20 cores and 2 core per task:
>>
>>- Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>>node: ~5 minutes.
>>- Read the csv.bz2 from TMPFS(setup to look like a shared storage
>>space): ~5 minutes.
>>- From the 2 previous tests I concluded that uncompressing the file
>>was part of the bottleneck so I decided to uncompress the file and store 
>> it
>>in TMPFS as well, result: ~5.9 minutes.
>>
>> The test file has 25'833'369 lines and is 370MB compressed and 3700MB
>> uncompressed. Those results have been reproduced several time each.
>> My question here is by what am I bottleneck in this case ?
>>
>> I though that the uncompressed file in RAM would be the fastest. Is it
>> possible that my program is suboptimal reading the CSV ?
>> In the execution logs on the cluster I have 5 to 10 seconds GC time max,
>> and timeline shows mainly CPU time (no shuffling, no randomization overload
>> either).
>> I also noticed that memory storage is never used during the execution. I
>> know from several hours of research that bz2 is the only real compression
>> algorithm usable as an input in spark for parallelization reasons.
>>
>> Do you have any idea of why such a behaviour ?
>> and do you have any idea on how to improve such treatment ?
>>
>> Cheers
>>
>> Antoine
>>
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: Identify bottleneck

2019-12-18 Thread Chris Teoh
Please look at the spark UI and confirm you are indeed getting more than 1
partition in your dataframe. Text files are usually not splittable so you
may just be doing all the work in a single partition.

If that is the case, It may be worthwhile considering calling the
repartition method to distribute your data across multiple partitions so
you get more parallelism.

On Wed, 18 Dec 2019, 9:35 pm Antoine DUBOIS, 
wrote:

> There's 15 withColumn Statement and one drop at the end to remove old
> column.
> I which I could write it as a single sql statement, but it's not
> reasonable for maintaining purpose.
> I will try on a local instance and let you know.
>
> Thanks  for the help.
>
>
> --
> *De: *"Enrico Minack" 
> *À: *user@spark.apache.org, "Antoine DUBOIS" 
> *Envoyé: *Mercredi 18 Décembre 2019 11:13:38
> *Objet: *Re: Identify bottleneck
>
> How many withColumn statements do you have? Note that it is better to use
> a single select, rather than lots of withColumn. This also makes drops
> redundant.
>
> Reading 25m CSV lines and writing to Parquet in 5 minutes on 32 cores is
> really slow. Can you try this on a single machine, i.e. run wit "local[*]".
>
> Can you rule out the writing part by counting the rows? I presume this all
> happens in a single stage.
>
> Enrico
>
>
> Am 18.12.19 um 10:56 schrieb Antoine DUBOIS:
>
> Hello
>
> I'm working on an ETL based on csv describing file systems to transform it
> into parquet so I can work on them easily to extract informations.
> I'm using Mr. Powers framework Daria to do so. I've quiet different input
> and a lot of transformation and the framework helps organize the code.
> I have a stand-alone cluster v2.3.2 composed of 4 node with 8 cores and
> 32GB of memory each.
> The storage is handle by a CephFS volume mounted on all nodes.
> First a small description of my algorithm (it's quiet simple):
>
> Use SparkContext to load the csv.bz2 file,
> Chain a lot of withColumn() statement,
> Drop all unnecessary columns,
> Write parquet file to CephFS
>
> This treatment can take several hours depending on how much lines the CSV
> is and I wanted to identify if bz2 or network could be an issue
> so I run the following test (several time with consistent result) :
> I tried the following scenario with 20 cores and 2 core per task:
>
>- Read the csv.bz2 from CephFS with connection with 1Gb/s for each
>node: ~5 minutes.
>- Read the csv.bz2 from TMPFS(setup to look like a shared storage
>space): ~5 minutes.
>- From the 2 previous tests I concluded that uncompressing the file
>was part of the bottleneck so I decided to uncompress the file and store it
>in TMPFS as well, result: ~5.9 minutes.
>
> The test file has 25'833'369 lines and is 370MB compressed and 3700MB
> uncompressed. Those results have been reproduced several time each.
> My question here is by what am I bottleneck in this case ?
>
> I though that the uncompressed file in RAM would be the fastest. Is it
> possible that my program is suboptimal reading the CSV ?
> In the execution logs on the cluster I have 5 to 10 seconds GC time max,
> and timeline shows mainly CPU time (no shuffling, no randomization overload
> either).
> I also noticed that memory storage is never used during the execution. I
> know from several hours of research that bz2 is the only real compression
> algorithm usable as an input in spark for parallelization reasons.
>
> Do you have any idea of why such a behaviour ?
> and do you have any idea on how to improve such treatment ?
>
> Cheers
>
> Antoine
>
>
>
>


Re: Request more yarn vcores than executors

2019-12-08 Thread Chris Teoh
If that is the case, perhaps set vcore to CPU core ratio as 1:1 and just do
--executor-cores 1 and that would at least try to get you more threads per
executor. Note that vcore is a logical construct and isn't directly related
to CPU cores, just the time slice allowed over the entire set of CPUs on
each server.

I've seen multi threading at the driver where there might be multiple jobs
being run if they're working on unevenly distributed workloads which more
efficiently leverage the executors. Perhaps that is something to consider.

On Sun, 8 Dec 2019, 8:29 pm jelmer,  wrote:

> you can take on more simultaneous tasks per executor
>
>
> That is exactly what I want to avoid. that nature of the task makes it
> difficult to parallelise over many partitions. Ideally i'd have 1 executor
> per task with 10+ cores assigned to each executor
>
> On Sun, 8 Dec 2019 at 10:23, Chris Teoh  wrote:
>
>> I thought --executor-cores is the same the other argument. If anything,
>> just set --executor-cores to something greater than 1 and don't set the
>> other one you mentioned. You'll then get greater number of cores per
>> executor so you can take on more simultaneous tasks per executor.
>>
>> On Sun, 8 Dec 2019, 8:16 pm jelmer,  wrote:
>>
>>> I have a job, running on yarn, that uses multithreading inside of a
>>> mapPartitions transformation
>>>
>>> Ideally I would like to have a small number of partitions but have a
>>> high number of yarn vcores allocated to the task (that i can take advantage
>>> of because of multi threading)
>>>
>>> Is this possible?
>>>
>>> I tried running with  : --executor-cores 1 --conf
>>> spark.yarn.executor.cores=20
>>> But it seems spark.yarn.executor.cores gets ignored
>>>
>>


Re: Request more yarn vcores than executors

2019-12-08 Thread Chris Teoh
I thought --executor-cores is the same the other argument. If anything,
just set --executor-cores to something greater than 1 and don't set the
other one you mentioned. You'll then get greater number of cores per
executor so you can take on more simultaneous tasks per executor.

On Sun, 8 Dec 2019, 8:16 pm jelmer,  wrote:

> I have a job, running on yarn, that uses multithreading inside of a
> mapPartitions transformation
>
> Ideally I would like to have a small number of partitions but have a high
> number of yarn vcores allocated to the task (that i can take advantage of
> because of multi threading)
>
> Is this possible?
>
> I tried running with  : --executor-cores 1 --conf
> spark.yarn.executor.cores=20
> But it seems spark.yarn.executor.cores gets ignored
>


Re: OOM Error

2019-09-07 Thread Chris Teoh
It says you have 3811 tasks in earlier stages and you're going down to 2001
partitions, that would make it more memory intensive. I'm guessing the
default spark shuffle partition was 200 so that would have failed. Go for
higher number, maybe even higher than 3811. What was your shuffle write
from stage 7 and shuffle read from stage 8?

On Sat, 7 Sep 2019, 7:57 pm Ankit Khettry,  wrote:

> Still unable to overcome the error. Attaching some screenshots for
> reference.
> Following are the configs used:
> spark.yarn.max.executor.failures 1000 spark.yarn.driver.memoryOverhead 6g
> spark.executor.cores 6 spark.executor.memory 36g
> spark.sql.shuffle.partitions 2001 spark.memory.offHeap.size 8g
> spark.memory.offHeap.enabled true spark.executor.instances 10
> spark.driver.memory 14g spark.yarn.executor.memoryOverhead 10g
>
> Best Regards
> Ankit Khettry
>
> On Sat, Sep 7, 2019 at 2:56 PM Chris Teoh  wrote:
>
>> You can try, consider processing each partition separately if your data
>> is heavily skewed when you partition it.
>>
>> On Sat, 7 Sep 2019, 7:19 pm Ankit Khettry, 
>> wrote:
>>
>>> Thanks Chris
>>>
>>> Going to try it soon by setting maybe spark.sql.shuffle.partitions to
>>> 2001. Also, I was wondering if it would help if I repartition the data by
>>> the fields I am using in group by and window operations?
>>>
>>> Best Regards
>>> Ankit Khettry
>>>
>>> On Sat, 7 Sep, 2019, 1:05 PM Chris Teoh,  wrote:
>>>
>>>> Hi Ankit,
>>>>
>>>> Without looking at the Spark UI and the stages/DAG, I'm guessing you're
>>>> running on default number of Spark shuffle partitions.
>>>>
>>>> If you're seeing a lot of shuffle spill, you likely have to increase
>>>> the number of shuffle partitions to accommodate the huge shuffle size.
>>>>
>>>> I hope that helps
>>>> Chris
>>>>
>>>> On Sat, 7 Sep 2019, 4:18 pm Ankit Khettry, 
>>>> wrote:
>>>>
>>>>> Nope, it's a batch job.
>>>>>
>>>>> Best Regards
>>>>> Ankit Khettry
>>>>>
>>>>> On Sat, 7 Sep, 2019, 6:52 AM Upasana Sharma, <028upasana...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is it a streaming job?
>>>>>>
>>>>>> On Sat, Sep 7, 2019, 5:04 AM Ankit Khettry 
>>>>>> wrote:
>>>>>>
>>>>>>> I have a Spark job that consists of a large number of Window
>>>>>>> operations and hence involves large shuffles. I have roughly 900 GiBs of
>>>>>>> data, although I am using a large enough cluster (10 * m5.4xlarge
>>>>>>> instances). I am using the following configurations for the job, 
>>>>>>> although I
>>>>>>> have tried various other combinations without any success.
>>>>>>>
>>>>>>> spark.yarn.driver.memoryOverhead 6g
>>>>>>> spark.storage.memoryFraction 0.1
>>>>>>> spark.executor.cores 6
>>>>>>> spark.executor.memory 36g
>>>>>>> spark.memory.offHeap.size 8g
>>>>>>> spark.memory.offHeap.enabled true
>>>>>>> spark.executor.instances 10
>>>>>>> spark.driver.memory 14g
>>>>>>> spark.yarn.executor.memoryOverhead 10g
>>>>>>>
>>>>>>> I keep running into the following OOM error:
>>>>>>>
>>>>>>> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire
>>>>>>> 16384 bytes of memory, got 0
>>>>>>> at
>>>>>>> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
>>>>>>> at
>>>>>>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
>>>>>>> at
>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>>>>>>> at
>>>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:163)
>>>>>>>
>>>>>>> I see there are a large number of JIRAs in place for similar issues
>>>>>>> and a great many of them are even marked resolved.
>>>>>>> Can someone guide me as to how to approach this problem? I am using
>>>>>>> Databricks Spark 2.4.1.
>>>>>>>
>>>>>>> Best Regards
>>>>>>> Ankit Khettry
>>>>>>>
>>>>>>


Re: OOM Error

2019-09-07 Thread Chris Teoh
You can try, consider processing each partition separately if your data is
heavily skewed when you partition it.

On Sat, 7 Sep 2019, 7:19 pm Ankit Khettry,  wrote:

> Thanks Chris
>
> Going to try it soon by setting maybe spark.sql.shuffle.partitions to
> 2001. Also, I was wondering if it would help if I repartition the data by
> the fields I am using in group by and window operations?
>
> Best Regards
> Ankit Khettry
>
> On Sat, 7 Sep, 2019, 1:05 PM Chris Teoh,  wrote:
>
>> Hi Ankit,
>>
>> Without looking at the Spark UI and the stages/DAG, I'm guessing you're
>> running on default number of Spark shuffle partitions.
>>
>> If you're seeing a lot of shuffle spill, you likely have to increase the
>> number of shuffle partitions to accommodate the huge shuffle size.
>>
>> I hope that helps
>> Chris
>>
>> On Sat, 7 Sep 2019, 4:18 pm Ankit Khettry, 
>> wrote:
>>
>>> Nope, it's a batch job.
>>>
>>> Best Regards
>>> Ankit Khettry
>>>
>>> On Sat, 7 Sep, 2019, 6:52 AM Upasana Sharma, <028upasana...@gmail.com>
>>> wrote:
>>>
>>>> Is it a streaming job?
>>>>
>>>> On Sat, Sep 7, 2019, 5:04 AM Ankit Khettry 
>>>> wrote:
>>>>
>>>>> I have a Spark job that consists of a large number of Window
>>>>> operations and hence involves large shuffles. I have roughly 900 GiBs of
>>>>> data, although I am using a large enough cluster (10 * m5.4xlarge
>>>>> instances). I am using the following configurations for the job, although 
>>>>> I
>>>>> have tried various other combinations without any success.
>>>>>
>>>>> spark.yarn.driver.memoryOverhead 6g
>>>>> spark.storage.memoryFraction 0.1
>>>>> spark.executor.cores 6
>>>>> spark.executor.memory 36g
>>>>> spark.memory.offHeap.size 8g
>>>>> spark.memory.offHeap.enabled true
>>>>> spark.executor.instances 10
>>>>> spark.driver.memory 14g
>>>>> spark.yarn.executor.memoryOverhead 10g
>>>>>
>>>>> I keep running into the following OOM error:
>>>>>
>>>>> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384
>>>>> bytes of memory, got 0
>>>>> at
>>>>> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
>>>>> at
>>>>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
>>>>> at
>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>>>>> at
>>>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:163)
>>>>>
>>>>> I see there are a large number of JIRAs in place for similar issues
>>>>> and a great many of them are even marked resolved.
>>>>> Can someone guide me as to how to approach this problem? I am using
>>>>> Databricks Spark 2.4.1.
>>>>>
>>>>> Best Regards
>>>>> Ankit Khettry
>>>>>
>>>>


Re: OOM Error

2019-09-07 Thread Chris Teoh
Hi Ankit,

Without looking at the Spark UI and the stages/DAG, I'm guessing you're
running on default number of Spark shuffle partitions.

If you're seeing a lot of shuffle spill, you likely have to increase the
number of shuffle partitions to accommodate the huge shuffle size.

I hope that helps
Chris

On Sat, 7 Sep 2019, 4:18 pm Ankit Khettry,  wrote:

> Nope, it's a batch job.
>
> Best Regards
> Ankit Khettry
>
> On Sat, 7 Sep, 2019, 6:52 AM Upasana Sharma, <028upasana...@gmail.com>
> wrote:
>
>> Is it a streaming job?
>>
>> On Sat, Sep 7, 2019, 5:04 AM Ankit Khettry 
>> wrote:
>>
>>> I have a Spark job that consists of a large number of Window operations
>>> and hence involves large shuffles. I have roughly 900 GiBs of data,
>>> although I am using a large enough cluster (10 * m5.4xlarge instances). I
>>> am using the following configurations for the job, although I have tried
>>> various other combinations without any success.
>>>
>>> spark.yarn.driver.memoryOverhead 6g
>>> spark.storage.memoryFraction 0.1
>>> spark.executor.cores 6
>>> spark.executor.memory 36g
>>> spark.memory.offHeap.size 8g
>>> spark.memory.offHeap.enabled true
>>> spark.executor.instances 10
>>> spark.driver.memory 14g
>>> spark.yarn.executor.memoryOverhead 10g
>>>
>>> I keep running into the following OOM error:
>>>
>>> org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 16384
>>> bytes of memory, got 0
>>> at
>>> org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
>>> at
>>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:98)
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.(UnsafeInMemorySorter.java:128)
>>> at
>>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:163)
>>>
>>> I see there are a large number of JIRAs in place for similar issues and
>>> a great many of them are even marked resolved.
>>> Can someone guide me as to how to approach this problem? I am using
>>> Databricks Spark 2.4.1.
>>>
>>> Best Regards
>>> Ankit Khettry
>>>
>>


Re: Control Sqoop job from Spark job

2019-09-02 Thread Chris Teoh
Hey Chetan,

How many database connections are you anticipating in this job? Is this for
every row in the dataframe?

Kind regards
Chris


On Mon., 2 Sep. 2019, 9:11 pm Chetan Khatri, 
wrote:

> Hi Chris, Thanks for the email. You're right. but it's like Sqoop job gets
> launched based on dataframe values in spark job. Certainly it can be
> isolated and broken.
>
> On Sat, Aug 31, 2019 at 8:07 AM Chris Teoh  wrote:
>
>> I'd say this is an uncommon approach, could you use a workflow/scheduling
>> system to call Sqoop outside of Spark? Spark is usually multiprocess
>> distributed so putting in this Sqoop job in the Spark code seems to imply
>> you want to run Sqoop first, then Spark. If you're really insistent on
>> this, call it from the driver using Sqoop Java APIs.
>>
>> On Fri, 30 Aug 2019 at 06:02, Chetan Khatri 
>> wrote:
>>
>>> Sorry,
>>> I call sqoop job from above function. Can you help me to resolve this.
>>>
>>> Thanks
>>>
>>> On Fri, Aug 30, 2019 at 1:31 AM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Hi Users,
>>>> I am launching a Sqoop job from Spark job and would like to FAIL Spark
>>>> job if Sqoop job fails.
>>>>
>>>> def executeSqoopOriginal(serverName: String, schemaName: String, username: 
>>>> String, password: String,
>>>>  query: String, splitBy: String, fetchSize: Int, 
>>>> numMappers: Int, targetDir: String, jobName: String, dateColumns: String) 
>>>> = {
>>>>
>>>>   val connectionString = "jdbc:sqlserver://" + serverName + ";" + 
>>>> "databaseName=" + schemaName
>>>>   var parameters = Array("import")
>>>>   parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
>>>>   parameters = parameters :+ "--connect"
>>>>   parameters = parameters :+ connectionString
>>>>   parameters = parameters :+ "--mapreduce-job-name"
>>>>   parameters = parameters :+ jobName
>>>>   parameters = parameters :+ "--username"
>>>>   parameters = parameters :+ username
>>>>   parameters = parameters :+ "--password"
>>>>   parameters = parameters :+ password
>>>>   parameters = parameters :+ "--hadoop-mapred-home"
>>>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
>>>>   parameters = parameters :+ "--hadoop-home"
>>>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
>>>>   parameters = parameters :+ "--query"
>>>>   parameters = parameters :+ query
>>>>   parameters = parameters :+ "--split-by"
>>>>   parameters = parameters :+ splitBy
>>>>   parameters = parameters :+ "--fetch-size"
>>>>   parameters = parameters :+ fetchSize.toString
>>>>   parameters = parameters :+ "--num-mappers"
>>>>   parameters = parameters :+ numMappers.toString
>>>>   if (dateColumns.length() > 0) {
>>>> parameters = parameters :+ "--map-column-java"
>>>> parameters = parameters :+ dateColumns
>>>>   }
>>>>   parameters = parameters :+ "--target-dir"
>>>>   parameters = parameters :+ targetDir
>>>>   parameters = parameters :+ "--delete-target-dir"
>>>>   parameters = parameters :+ "--as-avrodatafile"
>>>>
>>>> }
>>>>
>>>>
>>
>> --
>> Chris
>>
>


Re: Control Sqoop job from Spark job

2019-08-30 Thread Chris Teoh
I'd say this is an uncommon approach, could you use a workflow/scheduling
system to call Sqoop outside of Spark? Spark is usually multiprocess
distributed so putting in this Sqoop job in the Spark code seems to imply
you want to run Sqoop first, then Spark. If you're really insistent on
this, call it from the driver using Sqoop Java APIs.

On Fri, 30 Aug 2019 at 06:02, Chetan Khatri 
wrote:

> Sorry,
> I call sqoop job from above function. Can you help me to resolve this.
>
> Thanks
>
> On Fri, Aug 30, 2019 at 1:31 AM Chetan Khatri 
> wrote:
>
>> Hi Users,
>> I am launching a Sqoop job from Spark job and would like to FAIL Spark
>> job if Sqoop job fails.
>>
>> def executeSqoopOriginal(serverName: String, schemaName: String, username: 
>> String, password: String,
>>  query: String, splitBy: String, fetchSize: Int, numMappers: 
>> Int, targetDir: String, jobName: String, dateColumns: String) = {
>>
>>   val connectionString = "jdbc:sqlserver://" + serverName + ";" + 
>> "databaseName=" + schemaName
>>   var parameters = Array("import")
>>   parameters = parameters :+ "-Dmapreduce.job.user.classpath.first=true"
>>   parameters = parameters :+ "--connect"
>>   parameters = parameters :+ connectionString
>>   parameters = parameters :+ "--mapreduce-job-name"
>>   parameters = parameters :+ jobName
>>   parameters = parameters :+ "--username"
>>   parameters = parameters :+ username
>>   parameters = parameters :+ "--password"
>>   parameters = parameters :+ password
>>   parameters = parameters :+ "--hadoop-mapred-home"
>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop-mapreduce/"
>>   parameters = parameters :+ "--hadoop-home"
>>   parameters = parameters :+ "/usr/hdp/2.6.5.0-292/hadoop/"
>>   parameters = parameters :+ "--query"
>>   parameters = parameters :+ query
>>   parameters = parameters :+ "--split-by"
>>   parameters = parameters :+ splitBy
>>   parameters = parameters :+ "--fetch-size"
>>   parameters = parameters :+ fetchSize.toString
>>   parameters = parameters :+ "--num-mappers"
>>   parameters = parameters :+ numMappers.toString
>>   if (dateColumns.length() > 0) {
>> parameters = parameters :+ "--map-column-java"
>> parameters = parameters :+ dateColumns
>>   }
>>   parameters = parameters :+ "--target-dir"
>>   parameters = parameters :+ targetDir
>>   parameters = parameters :+ "--delete-target-dir"
>>   parameters = parameters :+ "--as-avrodatafile"
>>
>> }
>>
>>

-- 
Chris


Re: [pyspark 2.4.3] small input csv ~3.4GB gets 40K tasks created

2019-08-30 Thread Chris Teoh
Look at your DAG. Are there lots of CSV files? Does your input CSV
dataframe have lots of partitions to start with? Bear in mind cross join
makes the dataset much larger so expect to have more tasks.

On Fri, 30 Aug 2019 at 14:11, Rishi Shah  wrote:

> Hi All,
>
> I am scratching my head against this weird behavior, where df (read from
> .csv) of size ~3.4GB gets cross joined with itself and creates 50K tasks!
> How to correlate input size with number of tasks in this case?
>
> --
> Regards,
>
> Rishi Shah
>


-- 
Chris


Re: Reading configuration file in Spark Scala throws error

2019-08-03 Thread Chris Teoh
This seems to work-

val printEntry = new
java.util.function.Consumer[java.util.Map.Entry[String,com.typesafe.config.ConfigValue]]
{

override def accept(a:
java.util.Map.Entry[String,com.typesafe.config.ConfigValue]): Unit = {

  println(a.getKey)

}

  }



conf.entrySet.iterator.forEachRemaining (printEntry)



// returns

scala> conf.entrySet.iterator.forEachRemaining (printEntry)

dbUsername

dbPassword

bootstrapServers

dbDatabase


I hope that helps.

On Sun, 4 Aug 2019 at 05:29, Mich Talebzadeh 
wrote:

> Hi,
>
> I have a config file application.conf that I am trying to read.
>
> The skeleton code is as follows:
>
> ```
> import com.typesafe.config.ConfigFactory
> import scala.collection.JavaConverters
>   def main(args: Array[String]): Unit = {
> val globalConfig = ConfigFactory.load()  // pass in filename (without
> extension) to load additional config file in src/main/resources or CLASSPATH
> val conf   = globalConfig.getConfig("database")  // extract out
> top level key from top level namespace
> conf.entrySet().iterator().forEachRemaining { entry =>
>   val key:String = entry.getKey
>   val value:  Any= entry.getValue.unwrapped()  // access via entry
>   val value2: Any= conf.getAnyRef(key) // access via hash
> lookup from config
>   println( s"$key : $value | $value2" )  // string
> interpolation
> }
>   }
> ```
>
> But I am getting the following error
>
> ```
> [info] Compiling 1 Scala source to
> /data6/hduser/scala/testconf/target/scala-2.11/classes...
> [error]
> /data6/hduser/scala/testconf/src/main/scala/myPackage/testconf.scala:10:
> missing parameter type
> [error] conf.entrySet().iterator().forEachRemaining { entry =>
> [error]   ^
> [error] one error found
> [error] (compile:compileIncremental) Compilation failed
> ```
> The application.conf has the following layout
>
> database = {
>   dbDatabase = "trading"
>   dbPassword = "mongodb"
>   dbUsername = "trading_user_RW"
>   bootstrapServers = "rhes75:9092"
> }
>
> I appreciate any hint
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


-- 
Chris


Re: Spark 2.3 Dataframe Grouby operation throws IllegalArgumentException on Large dataset

2019-07-24 Thread Chris Teoh
This might be a hint. Maybe invalid data?

Caused by: java.lang.IllegalArgumentException: Missing required char
':' at 'struct>'


On Wed., 24 Jul. 2019, 2:15 pm Balakumar iyer S, 
wrote:

> Hi Bobby Evans,
>
> I apologise for the delayed response , yes you are right I missed out to
> paste the complete stack trace exception. Here with I have attached the
> complete yarn log for the same.
>
> Thank you , It would be helpful if you guys could assist me on this error.
>
>
> -
> Regards
> Balakumar Seetharaman
>
>
> On Mon, Jul 22, 2019 at 7:05 PM Bobby Evans  wrote:
>
>> You are missing a lot of the stack trace that could explain the
>> exception.  All it shows is that an exception happened while writing out
>> the orc file, not what that underlying exception is, there should be at
>> least one more caused by under the one you included.
>>
>> Thanks,
>>
>> Bobby
>>
>> On Mon, Jul 22, 2019 at 5:58 AM Balakumar iyer S 
>> wrote:
>>
>>> Hi ,
>>>
>>> I am trying to perform a group by  followed by aggregate collect set
>>> operation on a two column data-setschema (LeftData int , RightData
>>> int).
>>>
>>> code snippet
>>>
>>>   val wind_2  =
>>> dframe.groupBy("LeftData").agg(collect_set(array("RightData")))
>>>
>>>  wind_2.write.mode(SaveMode.Append).format("orc").save(args(1))
>>>
>>> the above code works fine on a smaller dataset but throws the following
>>> error on large dataset (where each keys in LeftData column  needs to be
>>> grouped with 64k values approximately ).
>>>
>>> Could some one assist me on this , should i  set any configuration to
>>> accommodate such a large  values?
>>>
>>> ERROR
>>> -
>>> Driver stacktrace:
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>>> at scala.Option.foreach(Option.scala:257)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
>>> at
>>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
>>>
>>>
>>> Caused by: org.apache.spark.SparkException: Task failed while writing
>>> rows.
>>> at
>>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
>>>
>>> --
>>> REGARDS
>>> BALAKUMAR SEETHARAMAN
>>>
>>>
>
> --
> REGARDS
> BALAKUMAR SEETHARAMAN
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Attempting to avoid a shuffle on join

2019-07-06 Thread Chris Teoh
Dataframes have a partitionBy function too.

You can avoid a shuffle if one of your datasets is small enough to
broadcast.

On Thu., 4 Jul. 2019, 7:34 am Mkal,  wrote:

> Please keep in mind i'm fairly new to spark.
> I have some spark code where i load two textfiles as datasets and after
> some
> map and filter operations to bring the columns in a specific shape, i join
> the datasets.
>
> The join takes place on a common column (of type string).
> Is there any way to avoid the exchange/shuffle before the join?
>
> As i understand it, the idea is that if i, initially, hash partition the
> datasets based on the join column, then the join would only have to look
> within the same partitions to complete the join, thus avoiding a shuffle.
>
> In the rdd API, you can create a hash partitioner and use partitionBy when
> creating the RDDS.(Though im not sure if this a sure way to avoid the
> shuffle on the join.) Is there any similar method for Dataframe/Dataset
> API?
>
> I also would like to avoid repartition,repartitionByRange and bucketing
> techniques since i only intend to do one join and these also require
> shuffling beforehand.
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Learning Spark

2019-07-05 Thread Chris Teoh
Scala is better suited to data engineering work. It also has better
integration with other components like HBase, Kafka, etc.

Python is great for data scientists as there are more data science
libraries available in Python.

On Fri., 5 Jul. 2019, 7:40 pm Vikas Garg,  wrote:

> Is there any disadvantage of using Python? I have gone through multiple
> articles which says that Python has advantages over Scala.
>
> Scala is super fast in comparison but Python has more pre-built libraries
> and options for analytics.
>
> Still should I go with Scala?
>
> On Fri, 5 Jul 2019 at 13:07, Kurt Fehlhauer  wrote:
>
>> Since you are a data engineer I would start by learning Scala. The parts
>> of Scala you would need to learn are pretty basic. Start with the examples
>> on the Spark website, which gives examples in multiple languages. Think of
>> Scala as a typed version of Python. You will find that the error messages
>> tend to be much more meaningful in Scala because that is the native
>> language of Spark. If you don’t want to to install the JVM and Scala, I
>> highly recommend Databricks community edition as a place to start.
>>
>> On Thu, Jul 4, 2019 at 11:22 PM Vikas Garg  wrote:
>>
>>> I am currently working as a data engineer and I am working on Power BI,
>>> SSIS (ETL Tool). For learning purpose, I have done the setup PySpark and
>>> also able to run queries through Spark on multi node cluster DB (I am using
>>> Vertica DB and later will move on HDFS or SQL Server).
>>>
>>> I have good knowledge of Python also.
>>>
>>> On Fri, 5 Jul 2019 at 10:32, Kurt Fehlhauer  wrote:
>>>
 Are you a data scientist or data engineer?


 On Thu, Jul 4, 2019 at 10:34 PM Vikas Garg  wrote:

> Hi,
>
> I am new Spark learner. Can someone guide me with the strategy towards
> getting expertise in PySpark.
>
> Thanks!!!
>



Re: Map side join without broadcast

2019-07-01 Thread Chris Teoh
Hey there,

I think it's overcomplicating the partitioning by explicitly specifying the
partitioning when using the hash is the default behaviour of the
partitioner in Spark. You could simply do a partitionBy and it would
implement the hash partitioner by default.

Let me know if I've misinterpreted the code. I think also using map after
partitioning will also cause Spark to lose the partitioner.

On Sun, 30 Jun 2019 at 20:56, jelmer  wrote:

> Does something like the code below make any sense or would there be a more
> efficient way to do it ?
>
> val wordsOnOnePartition = input
>>   .map { word => Math.abs(word.id.hashCode) % numPartitions -> word }
>>   .partitionBy(new PartitionIdPassthrough(numPartitions))
>> val indices = wordsOnOnePartition
>> .mapPartitions(it => new IndexIterator(it, m))
>> .cache()
>> val wordsOnEachPartition = input
>>   .flatMap(word => 0 until numPartitions map { partition => partition
>> -> word } )
>>   .partitionBy(new PartitionIdPassthrough(numPartitions))
>> val nearest = indices.join(wordsOnEachPartition)
>>   .flatMap { case (_, (index, Word(word, vector))) =>
>> index.findNearest(vector, k + 1).collect {
>>   case SearchResult(Word(relatedWord, _), score) if relatedWord
>> != word =>
>> RelatedItem(word, relatedWord, score)
>> }
>> .take(k)
>>   }
>> val result = nearest.groupBy(_.word).map { case (word, relatedItems)
>> =>
>> word +: relatedItems.toSeq
>> .sortBy(_.similarity)(Ordering[Double].reverse)
>> .map(_.relatedWord)
>> .take(k)
>> .mkString("\t")
>> }
>>
>
> I manually assign a partition to each word of a list of words, and
> repartition the rdd by this partition key
>
> There i use mapPartitions to construct a partial index so i end up with
> one index in each partition.
>
> Then i read the words again but this time assign every partition to each
> word and join it on the indices rdd by partition key. So effectively every
> index will be queries
>
> Finally i merge the results from each index into a single  list keeping
> only the most relevant items by doing a groupBy
>
>
>
> On Sun, 30 Jun 2019 at 01:45, Chris Teoh  wrote:
>
>> The closest thing I can think of here is if you have both dataframes
>> written out using buckets. Hive uses this technique for join optimisation
>> such that both datasets of the same bucket are read by the same mapper to
>> achieve map side joins.
>>
>> On Sat., 29 Jun. 2019, 9:10 pm jelmer,  wrote:
>>
>>> I have 2 dataframes,
>>>
>>> Dataframe A which contains 1 element per partition that is gigabytes big
>>> (an index)
>>>
>>> Dataframe B which is made up out of millions of small rows.
>>>
>>> I want to join B on A but i want all the work to be done on the
>>> executors holding the partitions of dataframe A
>>>
>>> Is there a way to accomplish this without putting dataframe B in a
>>> broadcast variable or doing a broadcast join ?
>>>
>>>

-- 
Chris


Re: Implementing Upsert logic Through Streaming

2019-07-01 Thread Chris Teoh
Use a windowing function to get the "latest" version of the records from
your incoming dataset and then update Oracle with the values, presumably
via a JDBC connector.

I hope that helps.

On Mon, 1 Jul 2019 at 14:04, Sachit Murarka  wrote:

> Hi Chris,
>
> I have to make sure my DB has updated value for any record at a given
> point of time.
> Say following is data. I have to take 4th row for EmpId 2.
> Also if any Emp details are already there in Oracle.  I have to update it
> with latest value in the stream.
>
> EmpId,  salary,  timestamp
> 1, 1000 , 1234
> 2, 2000, 2234
> 3, 2000,3234
> 2, 2100,4234
>
> Thanks
> Sachit
>
> On Mon, 1 Jul 2019, 01:46 Chris Teoh,  wrote:
>
>> Just thinking on this, if your needs can be addressed using batch instead
>> of streaming, I think this is a viable solution. Using a lambda
>> architecture approach seems like a possible solution.
>>
>> On Sun., 30 Jun. 2019, 9:54 am Chris Teoh,  wrote:
>>
>>> Not sure what your needs are here.
>>>
>>> If you can afford to wait, increase your micro batch windows to a long
>>> period of time, aggregate your data by key every micro batch and then apply
>>> those changes to the Oracle database.
>>>
>>> Since you're using text file to stream, there's no way to pre partition
>>> your stream. If you're using Kafka, you could partition by record key and
>>> do the summarisation that way before applying the changes to Oracle.
>>>
>>> I hope that helps.
>>>
>>> On Tue., 25 Jun. 2019, 9:43 pm Sachit Murarka, 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I will get records continously in text file form(Streaming). It will
>>>> have timestamp as field also.
>>>>
>>>> Target is Oracle Database.
>>>>
>>>> My Goal is to maintain latest record for a key in Oracle. Could you
>>>> please suggest how this can be implemented efficiently?
>>>>
>>>> Kind Regards,
>>>> Sachit Murarka
>>>>
>>>

-- 
Chris


Re: Implementing Upsert logic Through Streaming

2019-06-30 Thread Chris Teoh
Just thinking on this, if your needs can be addressed using batch instead
of streaming, I think this is a viable solution. Using a lambda
architecture approach seems like a possible solution.

On Sun., 30 Jun. 2019, 9:54 am Chris Teoh,  wrote:

> Not sure what your needs are here.
>
> If you can afford to wait, increase your micro batch windows to a long
> period of time, aggregate your data by key every micro batch and then apply
> those changes to the Oracle database.
>
> Since you're using text file to stream, there's no way to pre partition
> your stream. If you're using Kafka, you could partition by record key and
> do the summarisation that way before applying the changes to Oracle.
>
> I hope that helps.
>
> On Tue., 25 Jun. 2019, 9:43 pm Sachit Murarka, 
> wrote:
>
>> Hi All,
>>
>> I will get records continously in text file form(Streaming). It will have
>> timestamp as field also.
>>
>> Target is Oracle Database.
>>
>> My Goal is to maintain latest record for a key in Oracle. Could you
>> please suggest how this can be implemented efficiently?
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Re: Implementing Upsert logic Through Streaming

2019-06-29 Thread Chris Teoh
Not sure what your needs are here.

If you can afford to wait, increase your micro batch windows to a long
period of time, aggregate your data by key every micro batch and then apply
those changes to the Oracle database.

Since you're using text file to stream, there's no way to pre partition
your stream. If you're using Kafka, you could partition by record key and
do the summarisation that way before applying the changes to Oracle.

I hope that helps.

On Tue., 25 Jun. 2019, 9:43 pm Sachit Murarka, 
wrote:

> Hi All,
>
> I will get records continously in text file form(Streaming). It will have
> timestamp as field also.
>
> Target is Oracle Database.
>
> My Goal is to maintain latest record for a key in Oracle. Could you please
> suggest how this can be implemented efficiently?
>
> Kind Regards,
> Sachit Murarka
>


Re: Map side join without broadcast

2019-06-29 Thread Chris Teoh
The closest thing I can think of here is if you have both dataframes
written out using buckets. Hive uses this technique for join optimisation
such that both datasets of the same bucket are read by the same mapper to
achieve map side joins.

On Sat., 29 Jun. 2019, 9:10 pm jelmer,  wrote:

> I have 2 dataframes,
>
> Dataframe A which contains 1 element per partition that is gigabytes big
> (an index)
>
> Dataframe B which is made up out of millions of small rows.
>
> I want to join B on A but i want all the work to be done on the executors
> holding the partitions of dataframe A
>
> Is there a way to accomplish this without putting dataframe B in a
> broadcast variable or doing a broadcast join ?
>
>


Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
Thanks Peter.

I'm not sure if that is possible yet. The closest I can think of to
achieving what you want is to try something like:-
df.registerTempTable("mytable")
sql("create table mymanagedtable as select * from mytable")

I haven't used CTAS in Spark SQL before but have heard it works. This would
infer the schema for you and from what I have heard CTAS creates managed
tables.

Let me know if this works for you.

Kind Regards
Chris

On Thu, 14 Feb 2019 at 03:08 Horváth Péter Gergely <
horvath.peter.gerg...@gmail.com> wrote:

> Hi Chris,
>
> Thank you for the input, I know I can always write the table DDL manually.
>
> But here I would like to rely on Spark generating the schema. What I don't
> understand is the change in the behaviour of Spark: having the storage path
> specified does not necessarily mean it should be an external table.
>
> Is there any way to control/override this?
>
> Thanks,
> Peter
>
>
> On Wed, Feb 13, 2019, 13:09 Chris Teoh 
>> Hey there,
>>
>> Could you not just create a managed table using the DDL in Spark SQL and
>> then written the data frame to the underlying folder or use Spark SQL to do
>> an insert?
>>
>> Alternatively try create table as select. Iirc hive creates managed
>> tables this way.
>>
>> I've not confirmed this works but I think that might be worth trying.
>>
>> I hope that helps.
>>
>> Kind regards
>> Chris
>>
>> On Wed., 13 Feb. 2019, 10:44 pm Horváth Péter Gergely, <
>> horvath.peter.gerg...@gmail.com> wrote:
>>
>>> Dear All,
>>>
>>> I am facing a strange issue with Spark 2.3, where I would like to create
>>> a MANAGED table out of the content of a DataFrame with the storage path
>>> overridden.
>>>
>>> Apparently, when one tries to create a Hive table via
>>> DataFrameWriter.saveAsTable, supplying a "path" option causes Spark to
>>> automatically create an external table.
>>>
>>> This demonstrates the behaviour:
>>>
>>> scala> val numbersDF = sc.parallelize((1 to 100).toList).toDF("numbers")
>>> numbersDF: org.apache.spark.sql.DataFrame = [numbers: int]
>>>
>>> scala> numbersDF.write.format("orc").saveAsTable("numbers_table1")
>>>
>>> scala> spark.sql("describe formatted
>>> numbers_table1").filter(_.get(0).toString == "Type").show
>>> ++-+---+
>>> |col_name|data_type|comment|
>>> ++-+---+
>>> |Type|  MANAGED|   |
>>> ++-+---+
>>>
>>>
>>> scala> numbersDF.write.format("orc").option("path",
>>> "/user/foobar/numbers_table_data").saveAsTable("numbers_table2")
>>>
>>> scala> spark.sql("describe formatted
>>> numbers_table2").filter(_.get(0).toString == "Type").show
>>> ++-+---+
>>> |col_name|data_type|comment|
>>> ++-+---+
>>> |Type| EXTERNAL|   |
>>> ++-+---+
>>>
>>>
>>>
>>> I am wondering if there is any way to force creation of a managed table
>>> with a custom path (which as far as I know, should be possible via standard
>>> Hive commands).
>>>
>>> I often seem to have the problem that I cannot find the appropriate
>>> documentation for the option configuration of Spark APIs. Could someone
>>> please point me to the right direction and tell me where these things are
>>> documented?
>>>
>>> Thanks,
>>> Peter
>>>
>>>


Re: Spark2 DataFrameWriter.saveAsTable defaults to external table if path is provided

2019-02-13 Thread Chris Teoh
Hey there,

Could you not just create a managed table using the DDL in Spark SQL and
then written the data frame to the underlying folder or use Spark SQL to do
an insert?

Alternatively try create table as select. Iirc hive creates managed tables
this way.

I've not confirmed this works but I think that might be worth trying.

I hope that helps.

Kind regards
Chris

On Wed., 13 Feb. 2019, 10:44 pm Horváth Péter Gergely, <
horvath.peter.gerg...@gmail.com> wrote:

> Dear All,
>
> I am facing a strange issue with Spark 2.3, where I would like to create a
> MANAGED table out of the content of a DataFrame with the storage path
> overridden.
>
> Apparently, when one tries to create a Hive table via
> DataFrameWriter.saveAsTable, supplying a "path" option causes Spark to
> automatically create an external table.
>
> This demonstrates the behaviour:
>
> scala> val numbersDF = sc.parallelize((1 to 100).toList).toDF("numbers")
> numbersDF: org.apache.spark.sql.DataFrame = [numbers: int]
>
> scala> numbersDF.write.format("orc").saveAsTable("numbers_table1")
>
> scala> spark.sql("describe formatted
> numbers_table1").filter(_.get(0).toString == "Type").show
> ++-+---+
> |col_name|data_type|comment|
> ++-+---+
> |Type|  MANAGED|   |
> ++-+---+
>
>
> scala> numbersDF.write.format("orc").option("path",
> "/user/foobar/numbers_table_data").saveAsTable("numbers_table2")
>
> scala> spark.sql("describe formatted
> numbers_table2").filter(_.get(0).toString == "Type").show
> ++-+---+
> |col_name|data_type|comment|
> ++-+---+
> |Type| EXTERNAL|   |
> ++-+---+
>
>
>
> I am wondering if there is any way to force creation of a managed table
> with a custom path (which as far as I know, should be possible via standard
> Hive commands).
>
> I often seem to have the problem that I cannot find the appropriate
> documentation for the option configuration of Spark APIs. Could someone
> please point me to the right direction and tell me where these things are
> documented?
>
> Thanks,
> Peter
>
>


Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-01 Thread Chris Teoh
Hi James,

Try flatMap (_.toList). See below example:-

scala> case class MyClass(i:Int)
defined class MyClass

scala> val r = 1 to 100
r: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7,
8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26,
27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45,
46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64,
65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83,
84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

scala> val r2 = 101 to 200
r2: scala.collection.immutable.Range.Inclusive = Range(101, 102, 103, 104,
105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119,
120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134,
135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149,
150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164,
165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179,
180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194,
195, 196, 197, 198, 199, 200)

scala> val c1 = r.map(MyClass(_)).toIterable
c1: Iterable[MyClass] = Vector(MyClass(1), MyClass(2), MyClass(3),
MyClass(4), MyClass(5), MyClass(6), MyClass(7), MyClass(8), MyClass(9),
MyClass(10), MyClass(11), MyClass(12), MyClass(13), MyClass(14),
MyClass(15), MyClass(16), MyClass(17), MyClass(18), MyClass(19),
MyClass(20), MyClass(21), MyClass(22), MyClass(23), MyClass(24),
MyClass(25), MyClass(26), MyClass(27), MyClass(28), MyClass(29),
MyClass(30), MyClass(31), MyClass(32), MyClass(33), MyClass(34),
MyClass(35), MyClass(36), MyClass(37), MyClass(38), MyClass(39),
MyClass(40), MyClass(41), MyClass(42), MyClass(43), MyClass(44),
MyClass(45), MyClass(46), MyClass(47), MyClass(48), MyClass(49),
MyClass(50), MyClass(51), MyClass(52), MyClass(53), MyClass(54),
MyClass(55), MyClass(56), MyClass(57), MyClass(58), MyClass(59), MyClass(...

scala> val c2 = r2.map(MyClass(_)).toIterable
c2: Iterable[MyClass] = Vector(MyClass(101), MyClass(102), MyClass(103),
MyClass(104), MyClass(105), MyClass(106), MyClass(107), MyClass(108),
MyClass(109), MyClass(110), MyClass(111), MyClass(112), MyClass(113),
MyClass(114), MyClass(115), MyClass(116), MyClass(117), MyClass(118),
MyClass(119), MyClass(120), MyClass(121), MyClass(122), MyClass(123),
MyClass(124), MyClass(125), MyClass(126), MyClass(127), MyClass(128),
MyClass(129), MyClass(130), MyClass(131), MyClass(132), MyClass(133),
MyClass(134), MyClass(135), MyClass(136), MyClass(137), MyClass(138),
MyClass(139), MyClass(140), MyClass(141), MyClass(142), MyClass(143),
MyClass(144), MyClass(145), MyClass(146), MyClass(147), MyClass(148),
MyClass(149), MyClass(150), MyClass(151), MyClass(152), MyClass(153),
MyClass(154), MyClass(15...
scala> val rddIt = sc.parallelize(Seq(c1,c2))
rddIt: org.apache.spark.rdd.RDD[Iterable[MyClass]] =
ParallelCollectionRDD[2] at parallelize at :28

scala> rddIt.flatMap(_.toList)
res4: org.apache.spark.rdd.RDD[MyClass] = MapPartitionsRDD[3] at flatMap at
:26

res4 is what you're looking for.


On Sat, 1 Dec 2018 at 21:09, Chris Teoh  wrote:

> Do you have the full code example?
>
> I think this would be similar to the mapPartitions code flow, something
> like flatMap( _ =>  _.toList )
>
> I haven't yet tested this out but this is how I'd first try.
>
> On Sat, 1 Dec 2018 at 01:02, James Starks 
> wrote:
>
>> When processing data, I create an instance of RDD[Iterable[MyCaseClass]]
>> and I want to convert it to RDD[MyCaseClass] so that it can be further
>> converted to dataset or dataframe with toDS() function. But I encounter a
>> problem that SparkContext can not be instantiated within SparkSession.map
>> function because it already exists, even with allowMultipleContexts set to
>> true.
>>
>> val sc = new SparkConf()
>> sc.set("spark.driver.allowMultipleContexts", "true")
>> new SparkContext(sc).parallelize(seq)
>>
>> How can I fix this?
>>
>> Thanks.
>>
>
>
> --
> Chris
>


-- 
Chris


Re: Convert RDD[Iterrable[MyCaseClass]] to RDD[MyCaseClass]

2018-12-01 Thread Chris Teoh
Do you have the full code example?

I think this would be similar to the mapPartitions code flow, something
like flatMap( _ =>  _.toList )

I haven't yet tested this out but this is how I'd first try.

On Sat, 1 Dec 2018 at 01:02, James Starks 
wrote:

> When processing data, I create an instance of RDD[Iterable[MyCaseClass]]
> and I want to convert it to RDD[MyCaseClass] so that it can be further
> converted to dataset or dataframe with toDS() function. But I encounter a
> problem that SparkContext can not be instantiated within SparkSession.map
> function because it already exists, even with allowMultipleContexts set to
> true.
>
> val sc = new SparkConf()
> sc.set("spark.driver.allowMultipleContexts", "true")
> new SparkContext(sc).parallelize(seq)
>
> How can I fix this?
>
> Thanks.
>


-- 
Chris


Re: No auto decompress in Spark Java textFile function?

2015-09-09 Thread Chris Teoh
Thanks. What I noticed was the decompress works if the file is in HDFS but
not when it is a local file when working in a development environment.

Does anyone else have the same problem?
On Wed, 9 Sep 2015 at 4:40 pm Akhil Das <ak...@sigmoidanalytics.com> wrote:

> textFile used to work with .gz files, i haven't tested it on bz2 files. If
> it isn't decompressing by default then what you have to do is to use the
> sc.wholeTextFiles and then decompress each record (that being file) with
> the corresponding codec.
>
> Thanks
> Best Regards
>
> On Tue, Sep 8, 2015 at 6:49 PM, Chris Teoh <chris.t...@gmail.com> wrote:
>
>> Hi Folks,
>>
>> I tried using Spark v1.2 on bz2 files in Java but the behaviour is
>> different to the same textFile API call in Python and Scala.
>>
>> That being said, how do I process to read .tar.bz2 files in Spark's Java
>> API?
>>
>> Thanks in advance
>> Chris
>>
>
>


No auto decompress in Spark Java textFile function?

2015-09-08 Thread Chris Teoh
Hi Folks,

I tried using Spark v1.2 on bz2 files in Java but the behaviour is
different to the same textFile API call in Python and Scala.

That being said, how do I process to read .tar.bz2 files in Spark's Java
API?

Thanks in advance
Chris