Re: Submitting insert query from beeline failing on executor server with java 11

2021-03-16 Thread Jungtaek Lim
Hmm... I read the page again, and it looks like we are in a gray area.

Hadoop community supports JDK 11 starting from Hadoop 3.3, while we haven't
reached adding Hadoop 3.3 as a dependency. It may not make a real issue on
runtime with Hadoop 3.x as Spark is using a part of Hadoop (client layer),
but worth to know in any way that it's not in official support from the
Hadoop community.

On Wed, Mar 17, 2021 at 6:54 AM Jungtaek Lim 
wrote:

> Hadoop 2.x doesn't support JDK 11. See Hadoop Java version compatibility
> with JDK:
>
> https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Java+Versions
>
> That said, you'll need to use Spark 3.x with Hadoop 3.1 profile to make
> Spark work with JDK 11.
>
> On Tue, Mar 16, 2021 at 10:06 PM Sean Owen  wrote:
>
>> That looks like you didn't compile with Java 11 actually. How did you try
>> to do so?
>>
>> On Tue, Mar 16, 2021, 7:50 AM kaki mahesh raja <
>> kaki.mahesh_r...@nokia.com> wrote:
>>
>>> HI All,
>>>
>>> We have compiled spark with java 11 ("11.0.9.1") and when testing the
>>> thrift
>>> server we are seeing that insert query from operator using beeline
>>> failing
>>> with the below error.
>>>
>>> {"type":"log", "level":"ERROR", "time":"2021-03-15T05:06:09.559Z",
>>> "timezone":"UTC", "log":"Uncaught exception in thread
>>> blk_1077144750_3404529@[DatanodeInfoWithStorage[10.75.47.159:1044
>>> ,DS-1678921c-3fe6-4015-9849-bd1223c23369,DISK],
>>> DatanodeInfoWithStorage[10.75.47.158:1044
>>> ,DS-0b440eb7-fa7e-4ad8-bb5a-cdc50f3e7660,DISK]]"}
>>> java.lang.NoSuchMethodError: 'sun.misc.Cleaner
>>> sun.nio.ch.DirectBuffer.cleaner()'
>>> at
>>>
>>> org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
>>> ~[hadoop-common-2.10.1.jar:?]
>>> at
>>>
>>> org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:780)
>>> ~[hadoop-common-2.10.1.jar:?]
>>> at
>>>
>>> org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:322)
>>> ~[hadoop-common-2.10.1.jar:?]
>>> at java.io.FilterInputStream.close(FilterInputStream.java:180)
>>> ~[?:?]
>>> at
>>> org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:1003)
>>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>>> at
>>> org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:845)
>>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>>> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:840)
>>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.570Z",
>>> "timezone":"UTC", "log":"unwrapping token of length:54"}
>>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.599Z",
>>> "timezone":"UTC", "log":"IPC Client (1437736861) connection to
>>> vm-10-75-47-157/10.75.47.157:8020 from cspk got value #4"}
>>>
>>> Any inputs on how to fix this issue would be helpful for us.
>>>
>>> Thanks and Regards,
>>> kaki mahesh raja
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Submitting insert query from beeline failing on executor server with java 11

2021-03-16 Thread Jungtaek Lim
Hadoop 2.x doesn't support JDK 11. See Hadoop Java version compatibility
with JDK:

https://cwiki.apache.org/confluence/display/HADOOP/Hadoop+Java+Versions

That said, you'll need to use Spark 3.x with Hadoop 3.1 profile to make
Spark work with JDK 11.

On Tue, Mar 16, 2021 at 10:06 PM Sean Owen  wrote:

> That looks like you didn't compile with Java 11 actually. How did you try
> to do so?
>
> On Tue, Mar 16, 2021, 7:50 AM kaki mahesh raja 
> wrote:
>
>> HI All,
>>
>> We have compiled spark with java 11 ("11.0.9.1") and when testing the
>> thrift
>> server we are seeing that insert query from operator using beeline
>> failing
>> with the below error.
>>
>> {"type":"log", "level":"ERROR", "time":"2021-03-15T05:06:09.559Z",
>> "timezone":"UTC", "log":"Uncaught exception in thread
>> blk_1077144750_3404529@[DatanodeInfoWithStorage[10.75.47.159:1044
>> ,DS-1678921c-3fe6-4015-9849-bd1223c23369,DISK],
>> DatanodeInfoWithStorage[10.75.47.158:1044
>> ,DS-0b440eb7-fa7e-4ad8-bb5a-cdc50f3e7660,DISK]]"}
>> java.lang.NoSuchMethodError: 'sun.misc.Cleaner
>> sun.nio.ch.DirectBuffer.cleaner()'
>> at
>>
>> org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
>> ~[hadoop-common-2.10.1.jar:?]
>> at
>>
>> org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:780)
>> ~[hadoop-common-2.10.1.jar:?]
>> at
>>
>> org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:322)
>> ~[hadoop-common-2.10.1.jar:?]
>> at java.io.FilterInputStream.close(FilterInputStream.java:180)
>> ~[?:?]
>> at
>> org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:1003)
>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>> at
>> org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:845)
>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:840)
>> ~[hadoop-hdfs-client-2.10.1.jar:?]
>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.570Z",
>> "timezone":"UTC", "log":"unwrapping token of length:54"}
>> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.599Z",
>> "timezone":"UTC", "log":"IPC Client (1437736861) connection to
>> vm-10-75-47-157/10.75.47.157:8020 from cspk got value #4"}
>>
>> Any inputs on how to fix this issue would be helpful for us.
>>
>> Thanks and Regards,
>> kaki mahesh raja
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: How to make bucket listing faster while using S3 with wholeTextFile

2021-03-16 Thread Ben Kaylor
I currently am just writing in python on small batch machine using s3
boto3.client.

And the s3.get_paginator('list_objects_v2')

And using NextContinuationToken.

It is pretty fast on single bucket to list all with a million keys but have
not extended it to all other buckets.

Example code

import boto3
from pandas.io.json import json_normalize

s3=boto3 client('s3')
paginator=s3.get_paginator('list_objects_v2')
pages=paginator.paginate(Bucket=, Prefix=,ContinuationToken=)

df_arr=[]
for page_num, page_val in enumerate(pages):
 temp_df=json_normalize(page_val)
 df_arr.append(temp_df)

final_df=pd.concat(df_arr)
ContToken=final_df.NextContinuationToken.values[-1]

I was going to run per bucket so then can run multiple paginations on
different buckets. Or else now look at aws s3 inventory as way to generate
the keys.

This has metadata and head object info as well like storage class.



On Tue, Mar 16, 2021, 2:40 PM brandonge...@gmail.com 
wrote:

> One other possibility that might help is using the S3 SDK to generate the
> list you want and loading groups into dfs and doing unions as the end of
> the loading/filtering.
>
>
>
> Something like
>
>
> import com.amazonaws.services.s3.AmazonS3Client
>
> import com.amazonaws.services.s3.model.ListObjectsV2Request
>
> import scala.collection.JavaConverters._
>
>
>
> val s3Client = new AmazonS3Client()
>
> val commonPrefixesToDate = s3Client.listObjectsV2(new
> ListObjectsV2Request().withBucketName("your-bucket").withPrefix("prefix/to/dates").withDelimiter("/"))
>
> # Maybe get more prefixes depending on structure
>
> 
>
> val dfs =
> commonPrefixesToDate.seq.grouped(100).toList.par.map(groupedParts =>
> spark.read.parquet(groupedParts: _*))
>
> val finalDF = dfs.seq.grouped(100).toList.par.map(dfgroup =>
> dfgroup.reduce(_ union _)).reduce(_ union _).coalesce(2000)
>
>
>
> *From: *Ben Kaylor 
> *Date: *Tuesday, March 16, 2021 at 3:23 PM
> *To: *Boris Litvak 
> *Cc: *Alchemist , User <
> user@spark.apache.org>
> *Subject: *Re: How to make bucket listing faster while using S3 with
> wholeTextFile
>
> This is very helpful Boris.
>
> I will need to re-architect a piece of my code to work with this service
> but see it as more maintainable/stable long term.
>
> I will be developing it out over the course of a few weeks so will let you
> know how it goes.
>
>
>
> On Tue, Mar 16, 2021, 2:05 AM Boris Litvak  wrote:
>
> P.S.: 3. If fast updates are required, one way would be capturing S3
> events & putting the paths/modifications dates/etc of the paths into
> DynamoDB/your DB of choice.
>
>
>
> *From:* Boris Litvak
> *Sent:* Tuesday, 16 March 2021 9:03
> *To:* Ben Kaylor ; Alchemist <
> alchemistsrivast...@gmail.com>
> *Cc:* User 
> *Subject:* RE: How to make bucket listing faster while using S3 with
> wholeTextFile
>
>
>
> Ben, I’d explore these approaches:
>
>1. To address your problem, I’d setup an inventory for the S3 bucket:
>
> https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html.
>Then you can list the files from the inventory. Have not tried this myself.
>Note that the inventory update is done once per day, at most, and it’s
>eventually consistent.
>2. If possible, would try & make bigger files. One can’t do many
>things, such as streaming from scratch, when you have millions of files.
>
>
>
> Please tell us if it helps & how it goes.
>
>
>
> Boris
>
>
>
> *From:* Ben Kaylor 
> *Sent:* Monday, 15 March 2021 21:10
> *To:* Alchemist 
> *Cc:* User 
> *Subject:* Re: How to make bucket listing faster while using S3 with
> wholeTextFile
>
>
>
> Not sure on answer on this, but am solving similar issues. So looking for
> additional feedback on how to do this.
>
>
>
> My thoughts if unable to do via spark and S3 boto commands,  then have
> apps self report those changes. Where instead of having just mappers
> discovering the keys, you have services self reporting that a new key has
> been created or modified to a metadata service for incremental and more
> realtime updates.
>
>
>
> Would like to hear more ideas on this, thanks
>
> David
>
>
>
>
>
>
>
> On Mon, Mar 15, 2021, 11:31 AM Alchemist 
> wrote:
>
> *How to optimize s3 list S3 file using wholeTextFile()*: We are using
> wholeTextFile to read data from S3.  As per my understanding wholeTextFile
> first list files of given path.  Since we are using S3 as input source,
> then listing files in a bucket is single-threaded, the S3 API for listing
> the keys in a bucket only returns keys by chunks of 1000 per call.   Since
> we have at millions of files, we are making thousands API calls.  This
> listing make our processing very slow. How can we make listing of S3 faster?
>
>
>
> Thanks,
>
>
>
> Rachana
>
>


Re: How to make bucket listing faster while using S3 with wholeTextFile

2021-03-16 Thread brandonge...@gmail.com
One other possibility that might help is using the S3 SDK to generate the list you want and loading groups into dfs and doing unions as the end of the loading/filtering. Something likeimport com.amazonaws.services.s3.AmazonS3Clientimport com.amazonaws.services.s3.model.ListObjectsV2Requestimport scala.collection.JavaConverters._ val s3Client = new AmazonS3Client()val commonPrefixesToDate = s3Client.listObjectsV2(new ListObjectsV2Request().withBucketName("your-bucket").withPrefix("prefix/to/dates").withDelimiter("/"))# Maybe get more prefixes depending on structureval dfs = commonPrefixesToDate.seq.grouped(100).toList.par.map(groupedParts => spark.read.parquet(groupedParts: _*))val finalDF = dfs.seq.grouped(100).toList.par.map(dfgroup => dfgroup.reduce(_ union _)).reduce(_ union _).coalesce(2000) From: Ben Kaylor Date: Tuesday, March 16, 2021 at 3:23 PMTo: Boris Litvak Cc: Alchemist , User Subject: Re: How to make bucket listing faster while using S3 with wholeTextFileThis is very helpful Boris. I will need to re-architect a piece of my code to work with this service but see it as more maintainable/stable long term.I will be developing it out over the course of a few weeks so will let you know how it goes. On Tue, Mar 16, 2021, 2:05 AM Boris Litvak  wrote:P.S.: 3. If fast updates are required, one way would be capturing S3 events & putting the paths/modifications dates/etc of the paths into DynamoDB/your DB of choice. From: Boris Litvak Sent: Tuesday, 16 March 2021 9:03To: Ben Kaylor ; Alchemist Cc: User Subject: RE: How to make bucket listing faster while using S3 with wholeTextFile Ben, I’d explore these approaches:To address your problem, I’d setup an inventory for the S3 bucket: https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html. Then you can list the files from the inventory. Have not tried this myself. Note that the inventory update is done once per day, at most, and it’s eventually consistent.If possible, would try & make bigger files. One can’t do many things, such as streaming from scratch, when you have millions of files. Please tell us if it helps & how it goes. Boris  From: Ben Kaylor  Sent: Monday, 15 March 2021 21:10To: Alchemist Cc: User Subject: Re: How to make bucket listing faster while using S3 with wholeTextFile Not sure on answer on this, but am solving similar issues. So looking for additional feedback on how to do this. My thoughts if unable to do via spark and S3 boto commands,  then have apps self report those changes. Where instead of having just mappers discovering the keys, you have services self reporting that a new key has been created or modified to a metadata service for incremental and more realtime updates. Would like to hear more ideas on this, thanksDavid   On Mon, Mar 15, 2021, 11:31 AM Alchemist  wrote:How to optimize s3 list S3 file using wholeTextFile(): We are using wholeTextFile to read data from S3.  As per my understanding wholeTextFile first list files of given path.  Since we are using S3 as input source, then listing files in a bucket is single-threaded, the S3 API for listing the keys in a bucket only returns keys by chunks of 1000 per call.   Since we have at millions of files, we are making thousands API calls.  This listing make our processing very slow. How can we make listing of S3 faster? Thanks, Rachana

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to make bucket listing faster while using S3 with wholeTextFile

2021-03-16 Thread Ben Kaylor
This is very helpful Boris.
I will need to re-architect a piece of my code to work with this service
but see it as more maintainable/stable long term.
I will be developing it out over the course of a few weeks so will let you
know how it goes.


On Tue, Mar 16, 2021, 2:05 AM Boris Litvak  wrote:

> P.S.: 3. If fast updates are required, one way would be capturing S3
> events & putting the paths/modifications dates/etc of the paths into
> DynamoDB/your DB of choice.
>
>
>
> *From:* Boris Litvak
> *Sent:* Tuesday, 16 March 2021 9:03
> *To:* Ben Kaylor ; Alchemist <
> alchemistsrivast...@gmail.com>
> *Cc:* User 
> *Subject:* RE: How to make bucket listing faster while using S3 with
> wholeTextFile
>
>
>
> Ben, I’d explore these approaches:
>
>1. To address your problem, I’d setup an inventory for the S3 bucket:
>
> https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html.
>Then you can list the files from the inventory. Have not tried this myself.
>Note that the inventory update is done once per day, at most, and it’s
>eventually consistent.
>2. If possible, would try & make bigger files. One can’t do many
>things, such as streaming from scratch, when you have millions of files.
>
>
>
> Please tell us if it helps & how it goes.
>
>
>
> Boris
>
>
>
> *From:* Ben Kaylor 
> *Sent:* Monday, 15 March 2021 21:10
> *To:* Alchemist 
> *Cc:* User 
> *Subject:* Re: How to make bucket listing faster while using S3 with
> wholeTextFile
>
>
>
> Not sure on answer on this, but am solving similar issues. So looking for
> additional feedback on how to do this.
>
>
>
> My thoughts if unable to do via spark and S3 boto commands,  then have
> apps self report those changes. Where instead of having just mappers
> discovering the keys, you have services self reporting that a new key has
> been created or modified to a metadata service for incremental and more
> realtime updates.
>
>
>
> Would like to hear more ideas on this, thanks
>
> David
>
>
>
>
>
>
>
> On Mon, Mar 15, 2021, 11:31 AM Alchemist 
> wrote:
>
> *How to optimize s3 list S3 file using wholeTextFile()*: We are using
> wholeTextFile to read data from S3.  As per my understanding wholeTextFile
> first list files of given path.  Since we are using S3 as input source,
> then listing files in a bucket is single-threaded, the S3 API for listing
> the keys in a bucket only returns keys by chunks of 1000 per call.   Since
> we have at millions of files, we are making thousands API calls.  This
> listing make our processing very slow. How can we make listing of S3 faster?
>
>
>
> Thanks,
>
>
>
> Rachana
>
>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Attila Zsolt Piros
Oh, sure that was the reason. You can keep using the `foreachPartition` and
get the partition ID from the `TaskContext`:

scala> import org.apache.spark.TaskContext
import org.apache.spark.TaskContext

scala> myRDD.foreachPartition( e => {  println(TaskContext.getPartitionId +
":" + e.mkString(",")) } )
0:
1:
2:Animal(1,Lion)
3:
4:Animal(2,Elephant)
5:
6:
7:Animal(3,Jaguar)
8:
9:Animal(4,Tiger)
10:
11:Animal(5,Chetah)

scala>




On Tue, Mar 16, 2021 at 2:38 PM German Schiavon 
wrote:

> Hi all,
>
> I guess you could do something like this too:
>
> [image: Captura de pantalla 2021-03-16 a las 14.35.46.png]
>
> On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi Attila,
>>
>> Thanks for looking into this!
>>
>> I actually found the issue and it turned out to be that the print
>> statements misled me. The records are indeed stored in different partitions.
>> What happened is since the foreachpartition method is run parallelly by
>> different threads, they all printed the first line almost at the same time
>> and followed by data which is also printed at almost the same time. This
>> has given an appearance that all the data is stored in a single partition.
>> When I run the below code, I can see that the objects are stored in
>> different partitions of course!
>>
>> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
>> println("Index : " +index +" " + e)); itr}, true).collect()*
>>
>> Prints the below... (index: ?  the ? is actually the partition number)
>> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
>> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>>
>> Thanks!
>>
>> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
>> piros.attila.zs...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is weird. The code of foreachPartition
>>> 
>>>  leads
>>> to ParallelCollectionRDD
>>> 
>>>  which
>>> ends in slice
>>> ,
>>> where the most important part is the *positions* method:
>>>
>>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>>  (0 until numSlices).iterator.map { i =>
>>> val start = ((i * length) / numSlices).toInt
>>> val end = (((i + 1) * length) / numSlices).toInt
>>> (start, end)
>>>  }
>>>  }
>>>
>>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>>> some scala implicit might generate a Seq with one Array in it.
>>> But in that case your output would contain an Array. So this must be not
>>> the case.
>>>
>>> 1) What Spark/Scala version you are using? on what OS?
>>>
>>> 2)  Can you reproduce this issue in the spark-shell?
>>>
>>> scala> case class Animal(id:Int, name:String)
>>> defined class Animal
>>>
>>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>>> Tiger"), Animal(5, "Chetah") ) ), 12)
>>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>>> parallelize at :27
>>>
>>> scala> myRDD.foreachPartition( e => { println("--");
>>> e.foreach(println) } )
>>> --
>>> --
>>> --
>>> Animal(1,Lion)
>>> --
>>> --
>>> Animal(2,Elephant)
>>> --
>>> --
>>> --
>>> Animal(3,Jaguar)
>>> --
>>> --
>>> Animal(4,Tiger)
>>> --
>>> --
>>> Animal(5,Chetah)
>>>
>>> scala> Console println myRDD.getNumPartitions
>>> 12
>>>
>>> 3) Can you please check spark-shell what happens when you paste the
>>> above method and call it like:
>>>
>>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>>> = {
>>>  |   (0 until numSlices).iterator.map { i =>
>>>  | val start = ((i * length) / numSlices).toInt
>>>  |   val end = (((i + 1) * length) / numSlices).toInt
>>>  |   (start, end)
>>>  |   }
>>>  | }
>>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>>
>>> scala> positions(5, 12).foreach(println)
>>> (0,0)
>>> (0,0)
>>> (0,1)
>>> (1,1)
>>> (1,2)
>>> (2,2)
>>> (2,2)
>>> (2,3)
>>> (3,3)
>>> (3,4)
>>> (4,4)
>>> (4,5)
>>>
>>> As you can see in my case the `positions` result consistent with the 
>>> `foreachPartition`
>>> and this should be deterministic.
>>>
>>> Best regards,
>>> Attila
>>>
>>>
>>> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
>>> renganatha...@gmail.com> wrote:
>>>
 Hi,

 I have a question with respect to default partitioning in RDD.




 *case class Animal(id:Int, name:String)   val myRDD =
 session.sparkContext.parallelize( (Array( Animal(1, 

Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
That's a very good idea, thanks for sharing German!

On Tue, Mar 16, 2021 at 7:08 PM German Schiavon 
wrote:

> Hi all,
>
> I guess you could do something like this too:
>
> [image: Captura de pantalla 2021-03-16 a las 14.35.46.png]
>
> On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi Attila,
>>
>> Thanks for looking into this!
>>
>> I actually found the issue and it turned out to be that the print
>> statements misled me. The records are indeed stored in different partitions.
>> What happened is since the foreachpartition method is run parallelly by
>> different threads, they all printed the first line almost at the same time
>> and followed by data which is also printed at almost the same time. This
>> has given an appearance that all the data is stored in a single partition.
>> When I run the below code, I can see that the objects are stored in
>> different partitions of course!
>>
>> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
>> println("Index : " +index +" " + e)); itr}, true).collect()*
>>
>> Prints the below... (index: ?  the ? is actually the partition number)
>> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
>> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>>
>> Thanks!
>>
>> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
>> piros.attila.zs...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> This is weird. The code of foreachPartition
>>> 
>>>  leads
>>> to ParallelCollectionRDD
>>> 
>>>  which
>>> ends in slice
>>> ,
>>> where the most important part is the *positions* method:
>>>
>>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>>  (0 until numSlices).iterator.map { i =>
>>> val start = ((i * length) / numSlices).toInt
>>> val end = (((i + 1) * length) / numSlices).toInt
>>> (start, end)
>>>  }
>>>  }
>>>
>>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>>> some scala implicit might generate a Seq with one Array in it.
>>> But in that case your output would contain an Array. So this must be not
>>> the case.
>>>
>>> 1) What Spark/Scala version you are using? on what OS?
>>>
>>> 2)  Can you reproduce this issue in the spark-shell?
>>>
>>> scala> case class Animal(id:Int, name:String)
>>> defined class Animal
>>>
>>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>>> Tiger"), Animal(5, "Chetah") ) ), 12)
>>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>>> parallelize at :27
>>>
>>> scala> myRDD.foreachPartition( e => { println("--");
>>> e.foreach(println) } )
>>> --
>>> --
>>> --
>>> Animal(1,Lion)
>>> --
>>> --
>>> Animal(2,Elephant)
>>> --
>>> --
>>> --
>>> Animal(3,Jaguar)
>>> --
>>> --
>>> Animal(4,Tiger)
>>> --
>>> --
>>> Animal(5,Chetah)
>>>
>>> scala> Console println myRDD.getNumPartitions
>>> 12
>>>
>>> 3) Can you please check spark-shell what happens when you paste the
>>> above method and call it like:
>>>
>>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>>> = {
>>>  |   (0 until numSlices).iterator.map { i =>
>>>  | val start = ((i * length) / numSlices).toInt
>>>  |   val end = (((i + 1) * length) / numSlices).toInt
>>>  |   (start, end)
>>>  |   }
>>>  | }
>>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>>
>>> scala> positions(5, 12).foreach(println)
>>> (0,0)
>>> (0,0)
>>> (0,1)
>>> (1,1)
>>> (1,2)
>>> (2,2)
>>> (2,2)
>>> (2,3)
>>> (3,3)
>>> (3,4)
>>> (4,4)
>>> (4,5)
>>>
>>> As you can see in my case the `positions` result consistent with the 
>>> `foreachPartition`
>>> and this should be deterministic.
>>>
>>> Best regards,
>>> Attila
>>>
>>>
>>> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
>>> renganatha...@gmail.com> wrote:
>>>
 Hi,

 I have a question with respect to default partitioning in RDD.




 *case class Animal(id:Int, name:String)   val myRDD =
 session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
 Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
 "Chetah") ) ))Console println myRDD.getNumPartitions  *

 I am running the above piece of code in my laptop which has 12 logical
 cores.
 Hence I see that there are 12 partitions created.

 My understanding is that hash partitioning is used to determine which
 

Re: How default partitioning in spark is deployed

2021-03-16 Thread German Schiavon
Hi all,

I guess you could do something like this too:

[image: Captura de pantalla 2021-03-16 a las 14.35.46.png]

On Tue, 16 Mar 2021 at 13:31, Renganathan Mutthiah 
wrote:

> Hi Attila,
>
> Thanks for looking into this!
>
> I actually found the issue and it turned out to be that the print
> statements misled me. The records are indeed stored in different partitions.
> What happened is since the foreachpartition method is run parallelly by
> different threads, they all printed the first line almost at the same time
> and followed by data which is also printed at almost the same time. This
> has given an appearance that all the data is stored in a single partition.
> When I run the below code, I can see that the objects are stored in
> different partitions of course!
>
> *myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
> println("Index : " +index +" " + e)); itr}, true).collect()*
>
> Prints the below... (index: ?  the ? is actually the partition number)
> *Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
> Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *
>
> Thanks!
>
> On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>> Hi!
>>
>> This is weird. The code of foreachPartition
>> 
>>  leads
>> to ParallelCollectionRDD
>> 
>>  which
>> ends in slice
>> ,
>> where the most important part is the *positions* method:
>>
>>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>>  (0 until numSlices).iterator.map { i =>
>> val start = ((i * length) / numSlices).toInt
>> val end = (((i + 1) * length) / numSlices).toInt
>> (start, end)
>>  }
>>  }
>>
>> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
>> some scala implicit might generate a Seq with one Array in it.
>> But in that case your output would contain an Array. So this must be not
>> the case.
>>
>> 1) What Spark/Scala version you are using? on what OS?
>>
>> 2)  Can you reproduce this issue in the spark-shell?
>>
>> scala> case class Animal(id:Int, name:String)
>> defined class Animal
>>
>> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
>> Tiger"), Animal(5, "Chetah") ) ), 12)
>> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
>> parallelize at :27
>>
>> scala> myRDD.foreachPartition( e => { println("--");
>> e.foreach(println) } )
>> --
>> --
>> --
>> Animal(1,Lion)
>> --
>> --
>> Animal(2,Elephant)
>> --
>> --
>> --
>> Animal(3,Jaguar)
>> --
>> --
>> Animal(4,Tiger)
>> --
>> --
>> Animal(5,Chetah)
>>
>> scala> Console println myRDD.getNumPartitions
>> 12
>>
>> 3) Can you please check spark-shell what happens when you paste the above
>> method and call it like:
>>
>> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)]
>> = {
>>  |   (0 until numSlices).iterator.map { i =>
>>  | val start = ((i * length) / numSlices).toInt
>>  |   val end = (((i + 1) * length) / numSlices).toInt
>>  |   (start, end)
>>  |   }
>>  | }
>> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>>
>> scala> positions(5, 12).foreach(println)
>> (0,0)
>> (0,0)
>> (0,1)
>> (1,1)
>> (1,2)
>> (2,2)
>> (2,2)
>> (2,3)
>> (3,3)
>> (3,4)
>> (4,4)
>> (4,5)
>>
>> As you can see in my case the `positions` result consistent with the 
>> `foreachPartition`
>> and this should be deterministic.
>>
>> Best regards,
>> Attila
>>
>>
>> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
>> renganatha...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have a question with respect to default partitioning in RDD.
>>>
>>>
>>>
>>>
>>> *case class Animal(id:Int, name:String)   val myRDD =
>>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>>
>>> I am running the above piece of code in my laptop which has 12 logical
>>> cores.
>>> Hence I see that there are 12 partitions created.
>>>
>>> My understanding is that hash partitioning is used to determine which
>>> object needs to go to which partition. So in this case, the formula would
>>> be: hashCode() % 12
>>> But when I further examine, I see all the RDDs are put in the last
>>> partition.
>>>
>>> *myRDD.foreachPartition( e => { println("--");
>>> e.foreach(println) } )*
>>>
>>> Above code 

Re: Submitting insert query from beeline failing on executor server with java 11

2021-03-16 Thread Sean Owen
That looks like you didn't compile with Java 11 actually. How did you try
to do so?

On Tue, Mar 16, 2021, 7:50 AM kaki mahesh raja 
wrote:

> HI All,
>
> We have compiled spark with java 11 ("11.0.9.1") and when testing the
> thrift
> server we are seeing that insert query from operator using beeline failing
> with the below error.
>
> {"type":"log", "level":"ERROR", "time":"2021-03-15T05:06:09.559Z",
> "timezone":"UTC", "log":"Uncaught exception in thread
> blk_1077144750_3404529@[DatanodeInfoWithStorage[10.75.47.159:1044
> ,DS-1678921c-3fe6-4015-9849-bd1223c23369,DISK],
> DatanodeInfoWithStorage[10.75.47.158:1044
> ,DS-0b440eb7-fa7e-4ad8-bb5a-cdc50f3e7660,DISK]]"}
> java.lang.NoSuchMethodError: 'sun.misc.Cleaner
> sun.nio.ch.DirectBuffer.cleaner()'
> at
>
> org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
> ~[hadoop-common-2.10.1.jar:?]
> at
>
> org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:780)
> ~[hadoop-common-2.10.1.jar:?]
> at
>
> org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:322)
> ~[hadoop-common-2.10.1.jar:?]
> at java.io.FilterInputStream.close(FilterInputStream.java:180)
> ~[?:?]
> at
> org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:1003)
> ~[hadoop-hdfs-client-2.10.1.jar:?]
> at
> org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:845)
> ~[hadoop-hdfs-client-2.10.1.jar:?]
> at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:840)
> ~[hadoop-hdfs-client-2.10.1.jar:?]
> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.570Z",
> "timezone":"UTC", "log":"unwrapping token of length:54"}
> {"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.599Z",
> "timezone":"UTC", "log":"IPC Client (1437736861) connection to
> vm-10-75-47-157/10.75.47.157:8020 from cspk got value #4"}
>
> Any inputs on how to fix this issue would be helpful for us.
>
> Thanks and Regards,
> kaki mahesh raja
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Submitting insert query from beeline failing on executor server with java 11

2021-03-16 Thread kaki mahesh raja
HI All,

We have compiled spark with java 11 ("11.0.9.1") and when testing the thrift
server we are seeing that insert query from operator using beeline failing 
with the below error.

{"type":"log", "level":"ERROR", "time":"2021-03-15T05:06:09.559Z",
"timezone":"UTC", "log":"Uncaught exception in thread
blk_1077144750_3404529@[DatanodeInfoWithStorage[10.75.47.159:1044,DS-1678921c-3fe6-4015-9849-bd1223c23369,DISK],
DatanodeInfoWithStorage[10.75.47.158:1044,DS-0b440eb7-fa7e-4ad8-bb5a-cdc50f3e7660,DISK]]"}
java.lang.NoSuchMethodError: 'sun.misc.Cleaner
sun.nio.ch.DirectBuffer.cleaner()'
at
org.apache.hadoop.crypto.CryptoStreamUtils.freeDB(CryptoStreamUtils.java:40)
~[hadoop-common-2.10.1.jar:?]
at
org.apache.hadoop.crypto.CryptoInputStream.freeBuffers(CryptoInputStream.java:780)
~[hadoop-common-2.10.1.jar:?]
at
org.apache.hadoop.crypto.CryptoInputStream.close(CryptoInputStream.java:322)
~[hadoop-common-2.10.1.jar:?]
at java.io.FilterInputStream.close(FilterInputStream.java:180)
~[?:?]
at
org.apache.hadoop.hdfs.DataStreamer.closeStream(DataStreamer.java:1003)
~[hadoop-hdfs-client-2.10.1.jar:?]
at
org.apache.hadoop.hdfs.DataStreamer.closeInternal(DataStreamer.java:845)
~[hadoop-hdfs-client-2.10.1.jar:?]
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:840)
~[hadoop-hdfs-client-2.10.1.jar:?]
{"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.570Z",
"timezone":"UTC", "log":"unwrapping token of length:54"}
{"type":"log", "level":"DEBUG", "time":"2021-03-15T05:06:09.599Z",
"timezone":"UTC", "log":"IPC Client (1437736861) connection to
vm-10-75-47-157/10.75.47.157:8020 from cspk got value #4"}

Any inputs on how to fix this issue would be helpful for us.

Thanks and Regards,
kaki mahesh raja



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
Hi Attila,

Thanks for looking into this!

I actually found the issue and it turned out to be that the print
statements misled me. The records are indeed stored in different partitions.
What happened is since the foreachpartition method is run parallelly by
different threads, they all printed the first line almost at the same time
and followed by data which is also printed at almost the same time. This
has given an appearance that all the data is stored in a single partition.
When I run the below code, I can see that the objects are stored in
different partitions of course!

*myRDD.mapPartitionsWithIndex( (index,itr) => { itr.foreach( e =>
println("Index : " +index +" " + e)); itr}, true).collect()*

Prints the below... (index: ?  the ? is actually the partition number)
*Index : 9 Animal(4,Tiger) Index : 4 Animal(2,Elephant) Index : 11
Animal(5,Chetah) Index : 2 Animal(1,Lion) Index : 7 Animal(3,Jaguar) *

Thanks!

On Tue, Mar 16, 2021 at 3:06 PM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

> Hi!
>
> This is weird. The code of foreachPartition
> 
>  leads
> to ParallelCollectionRDD
> 
>  which
> ends in slice
> ,
> where the most important part is the *positions* method:
>
>  def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
>  (0 until numSlices).iterator.map { i =>
> val start = ((i * length) / numSlices).toInt
> val end = (((i + 1) * length) / numSlices).toInt
> (start, end)
>  }
>  }
>
> Because of the extra ' (' you used in "*parallelize( (Array*" I thought
> some scala implicit might generate a Seq with one Array in it.
> But in that case your output would contain an Array. So this must be not
> the case.
>
> 1) What Spark/Scala version you are using? on what OS?
>
> 2)  Can you reproduce this issue in the spark-shell?
>
> scala> case class Animal(id:Int, name:String)
> defined class Animal
>
> scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
> Tiger"), Animal(5, "Chetah") ) ), 12)
> myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
> parallelize at :27
>
> scala> myRDD.foreachPartition( e => { println("--");
> e.foreach(println) } )
> --
> --
> --
> Animal(1,Lion)
> --
> --
> Animal(2,Elephant)
> --
> --
> --
> Animal(3,Jaguar)
> --
> --
> Animal(4,Tiger)
> --
> --
> Animal(5,Chetah)
>
> scala> Console println myRDD.getNumPartitions
> 12
>
> 3) Can you please check spark-shell what happens when you paste the above
> method and call it like:
>
> scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] =
> {
>  |   (0 until numSlices).iterator.map { i =>
>  | val start = ((i * length) / numSlices).toInt
>  |   val end = (((i + 1) * length) / numSlices).toInt
>  |   (start, end)
>  |   }
>  | }
> positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]
>
> scala> positions(5, 12).foreach(println)
> (0,0)
> (0,0)
> (0,1)
> (1,1)
> (1,2)
> (2,2)
> (2,2)
> (2,3)
> (3,3)
> (3,4)
> (4,4)
> (4,5)
>
> As you can see in my case the `positions` result consistent with the 
> `foreachPartition`
> and this should be deterministic.
>
> Best regards,
> Attila
>
>
> On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question with respect to default partitioning in RDD.
>>
>>
>>
>>
>> *case class Animal(id:Int, name:String)   val myRDD =
>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>
>> I am running the above piece of code in my laptop which has 12 logical
>> cores.
>> Hence I see that there are 12 partitions created.
>>
>> My understanding is that hash partitioning is used to determine which
>> object needs to go to which partition. So in this case, the formula would
>> be: hashCode() % 12
>> But when I further examine, I see all the RDDs are put in the last
>> partition.
>>
>> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
>> } )*
>>
>> Above code prints the below(first eleven partitions are empty and the
>> last one has all the objects. The line is separate the partition contents):
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> Animal(2,Elephant)
>> 

Re: Spark streaming giving error for version 2.4

2021-03-16 Thread Attila Zsolt Piros
Hi!

I am just guessing here (as Gabor said before we need more information /
logs):
But is it possible Renu that you just upgraded one single jar?

Best Regards,
Attila

On Tue, Mar 16, 2021 at 11:31 AM Gabor Somogyi 
wrote:

> Well, this is not much. Please provide driver and executor logs...
>
> G
>
>
> On Tue, Mar 16, 2021 at 6:03 AM Renu Yadav  wrote:
>
>> Hi Team,
>>
>>
>> I have upgraded my spark streaming from 2.2 to 2.4 but getting below
>> error:
>>
>>
>> spark-streaming-kafka_0-10.2.11_2.4.0
>>
>>
>> scala 2.11
>>
>>
>> Any Idea?
>>
>>
>>
>> main" java.lang.AbstractMethodError
>>
>> at
>> org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)
>>
>> at
>> org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)
>>
>>
>> Thanks & Regards,
>>
>> Renu Yadav
>>
>>


Re: Spark streaming giving error for version 2.4

2021-03-16 Thread Gabor Somogyi
Well, this is not much. Please provide driver and executor logs...

G


On Tue, Mar 16, 2021 at 6:03 AM Renu Yadav  wrote:

> Hi Team,
>
>
> I have upgraded my spark streaming from 2.2 to 2.4 but getting below error:
>
>
> spark-streaming-kafka_0-10.2.11_2.4.0
>
>
> scala 2.11
>
>
> Any Idea?
>
>
>
> main" java.lang.AbstractMethodError
>
> at
> org.apache.spark.util.ListenerBus$class.$init$(ListenerBus.scala:34)
>
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.(StreamingListenerBus.scala:30)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler.(JobScheduler.scala:57)
>
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:184)
>
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:85)
>
>
> Thanks & Regards,
>
> Renu Yadav
>
>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Attila Zsolt Piros
Hi!

This is weird. The code of foreachPartition

leads
to ParallelCollectionRDD

which
ends in slice
,
where the most important part is the *positions* method:

 def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 (0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
 }
 }

Because of the extra ' (' you used in "*parallelize( (Array*" I thought
some scala implicit might generate a Seq with one Array in it.
But in that case your output would contain an Array. So this must be not
the case.

1) What Spark/Scala version you are using? on what OS?

2)  Can you reproduce this issue in the spark-shell?

scala> case class Animal(id:Int, name:String)
defined class Animal

scala> val myRDD = sc.parallelize( (Array( Animal(1, "Lion"),
Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"
Tiger"), Animal(5, "Chetah") ) ), 12)
myRDD: org.apache.spark.rdd.RDD[Animal] = ParallelCollectionRDD[2] at
parallelize at :27

scala> myRDD.foreachPartition( e => { println("--");
e.foreach(println) } )
--
--
--
Animal(1,Lion)
--
--
Animal(2,Elephant)
--
--
--
Animal(3,Jaguar)
--
--
Animal(4,Tiger)
--
--
Animal(5,Chetah)

scala> Console println myRDD.getNumPartitions
12

3) Can you please check spark-shell what happens when you paste the above
method and call it like:

scala> def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 |   (0 until numSlices).iterator.map { i =>
 | val start = ((i * length) / numSlices).toInt
 |   val end = (((i + 1) * length) / numSlices).toInt
 |   (start, end)
 |   }
 | }
positions: (length: Long, numSlices: Int)Iterator[(Int, Int)]

scala> positions(5, 12).foreach(println)
(0,0)
(0,0)
(0,1)
(1,1)
(1,2)
(2,2)
(2,2)
(2,3)
(3,3)
(3,4)
(4,4)
(4,5)

As you can see in my case the `positions` result consistent with the
`foreachPartition`
and this should be deterministic.

Best regards,
Attila


On Tue, Mar 16, 2021 at 5:34 AM Renganathan Mutthiah <
renganatha...@gmail.com> wrote:

> Hi,
>
> I have a question with respect to default partitioning in RDD.
>
>
>
>
> *case class Animal(id:Int, name:String)   val myRDD =
> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>
> I am running the above piece of code in my laptop which has 12 logical
> cores.
> Hence I see that there are 12 partitions created.
>
> My understanding is that hash partitioning is used to determine which
> object needs to go to which partition. So in this case, the formula would
> be: hashCode() % 12
> But when I further examine, I see all the RDDs are put in the last
> partition.
>
> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
> } )*
>
> Above code prints the below(first eleven partitions are empty and the last
> one has all the objects. The line is separate the partition contents):
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> Animal(2,Elephant)
> Animal(4,Tiger)
> Animal(3,Jaguar)
> Animal(5,Chetah)
> Animal(1,Lion)
>
> I don't know why this happens. Can you please help.
>
> Thanks!
>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Renganathan Mutthiah
Hi Mich,

Thanks for your precious time looking into my query. Yes, when we increase
the number of objects, all partitions start having the data. I actually
tried to understand what happens in my particular case.

Thanks!

On Tue, Mar 16, 2021 at 2:10 PM Mich Talebzadeh 
wrote:

> Hi,
>
> Well as it appears you have 5 entries in your data and 12 cores. The
> theory is that you run multiple tasks in parallel across multiple cores
> on a desktop which applies to your case. The statistics is not there to
> give a meaningful interpretation why Spark decided to put all data in one
> partition. If an RDD has too many partitions, then task scheduling may
> take more time than the actual execution time. In summary you just do not
> have enough statistics to draw a meaningful conclusion.
>
> Try to generate 100,000 rows and run your query and look at the pattern.
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 16 Mar 2021 at 04:35, Renganathan Mutthiah <
> renganatha...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a question with respect to default partitioning in RDD.
>>
>>
>>
>>
>> *case class Animal(id:Int, name:String)   val myRDD =
>> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
>> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
>> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>>
>> I am running the above piece of code in my laptop which has 12 logical
>> cores.
>> Hence I see that there are 12 partitions created.
>>
>> My understanding is that hash partitioning is used to determine which
>> object needs to go to which partition. So in this case, the formula would
>> be: hashCode() % 12
>> But when I further examine, I see all the RDDs are put in the last
>> partition.
>>
>> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
>> } )*
>>
>> Above code prints the below(first eleven partitions are empty and the
>> last one has all the objects. The line is separate the partition contents):
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> --
>> Animal(2,Elephant)
>> Animal(4,Tiger)
>> Animal(3,Jaguar)
>> Animal(5,Chetah)
>> Animal(1,Lion)
>>
>> I don't know why this happens. Can you please help.
>>
>> Thanks!
>>
>


Re: How default partitioning in spark is deployed

2021-03-16 Thread Mich Talebzadeh
Hi,

Well as it appears you have 5 entries in your data and 12 cores. The theory
is that you run multiple tasks in parallel across multiple cores on a
desktop which applies to your case. The statistics is not there to give a
meaningful interpretation why Spark decided to put all data in one
partition. If an RDD has too many partitions, then task scheduling may take
more time than the actual execution time. In summary you just do not have
enough statistics to draw a meaningful conclusion.

Try to generate 100,000 rows and run your query and look at the pattern.

HTH



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 16 Mar 2021 at 04:35, Renganathan Mutthiah 
wrote:

> Hi,
>
> I have a question with respect to default partitioning in RDD.
>
>
>
>
> *case class Animal(id:Int, name:String)   val myRDD =
> session.sparkContext.parallelize( (Array( Animal(1, "Lion"),
> Animal(2,"Elephant"), Animal(3,"Jaguar"), Animal(4,"Tiger"), Animal(5,
> "Chetah") ) ))Console println myRDD.getNumPartitions  *
>
> I am running the above piece of code in my laptop which has 12 logical
> cores.
> Hence I see that there are 12 partitions created.
>
> My understanding is that hash partitioning is used to determine which
> object needs to go to which partition. So in this case, the formula would
> be: hashCode() % 12
> But when I further examine, I see all the RDDs are put in the last
> partition.
>
> *myRDD.foreachPartition( e => { println("--"); e.foreach(println)
> } )*
>
> Above code prints the below(first eleven partitions are empty and the last
> one has all the objects. The line is separate the partition contents):
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> --
> Animal(2,Elephant)
> Animal(4,Tiger)
> Animal(3,Jaguar)
> Animal(5,Chetah)
> Animal(1,Lion)
>
> I don't know why this happens. Can you please help.
>
> Thanks!
>


RE: How to make bucket listing faster while using S3 with wholeTextFile

2021-03-16 Thread Boris Litvak
P.S.: 3. If fast updates are required, one way would be capturing S3 events & 
putting the paths/modifications dates/etc of the paths into DynamoDB/your DB of 
choice.

From: Boris Litvak
Sent: Tuesday, 16 March 2021 9:03
To: Ben Kaylor ; Alchemist 
Cc: User 
Subject: RE: How to make bucket listing faster while using S3 with wholeTextFile

Ben, I’d explore these approaches:

  1.  To address your problem, I’d setup an inventory for the S3 bucket: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html. 
Then you can list the files from the inventory. Have not tried this myself. 
Note that the inventory update is done once per day, at most, and it’s 
eventually consistent.
  2.  If possible, would try & make bigger files. One can’t do many things, 
such as streaming from scratch, when you have millions of files.

Please tell us if it helps & how it goes.

Boris

From: Ben Kaylor mailto:kaylor...@gmail.com>>
Sent: Monday, 15 March 2021 21:10
To: Alchemist 
mailto:alchemistsrivast...@gmail.com>>
Cc: User mailto:user@spark.apache.org>>
Subject: Re: How to make bucket listing faster while using S3 with wholeTextFile

Not sure on answer on this, but am solving similar issues. So looking for 
additional feedback on how to do this.

My thoughts if unable to do via spark and S3 boto commands,  then have apps 
self report those changes. Where instead of having just mappers discovering the 
keys, you have services self reporting that a new key has been created or 
modified to a metadata service for incremental and more realtime updates.

Would like to hear more ideas on this, thanks
David



On Mon, Mar 15, 2021, 11:31 AM Alchemist 
mailto:alchemistsrivast...@gmail.com>> wrote:
How to optimize s3 list S3 file using wholeTextFile(): We are using 
wholeTextFile to read data from S3.  As per my understanding wholeTextFile 
first list files of given path.  Since we are using S3 as input source, then 
listing files in a bucket is single-threaded, the S3 API for listing the keys 
in a bucket only returns keys by chunks of 1000 per call.   Since we have at 
millions of files, we are making thousands API calls.  This listing make our 
processing very slow. How can we make listing of S3 faster?

Thanks,

Rachana


RE: How to make bucket listing faster while using S3 with wholeTextFile

2021-03-16 Thread Boris Litvak
Ben, I’d explore these approaches:

  1.  To address your problem, I’d setup an inventory for the S3 bucket: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-inventory.html. 
Then you can list the files from the inventory. Have not tried this myself. 
Note that the inventory update is done once per day, at most, and it’s 
eventually consistent.
  2.  If possible, would try & make bigger files. One can’t do many things, 
such as streaming from scratch, when you have millions of files.

Please tell us if it helps & how it goes.

Boris

From: Ben Kaylor 
Sent: Monday, 15 March 2021 21:10
To: Alchemist 
Cc: User 
Subject: Re: How to make bucket listing faster while using S3 with wholeTextFile

Not sure on answer on this, but am solving similar issues. So looking for 
additional feedback on how to do this.

My thoughts if unable to do via spark and S3 boto commands,  then have apps 
self report those changes. Where instead of having just mappers discovering the 
keys, you have services self reporting that a new key has been created or 
modified to a metadata service for incremental and more realtime updates.

Would like to hear more ideas on this, thanks
David



On Mon, Mar 15, 2021, 11:31 AM Alchemist 
mailto:alchemistsrivast...@gmail.com>> wrote:
How to optimize s3 list S3 file using wholeTextFile(): We are using 
wholeTextFile to read data from S3.  As per my understanding wholeTextFile 
first list files of given path.  Since we are using S3 as input source, then 
listing files in a bucket is single-threaded, the S3 API for listing the keys 
in a bucket only returns keys by chunks of 1000 per call.   Since we have at 
millions of files, we are making thousands API calls.  This listing make our 
processing very slow. How can we make listing of S3 faster?

Thanks,

Rachana