Union of RDDs Hung

2017-12-12 Thread Vikash Pareek
Hi All,

I am unioning 2 rdds(each of them having 2 records) but this union it is
getting hang.
I found a solution to this that is caching both the rdds before performing
union but I could not figure out the root cause of hanging the job.

Is somebody knows why this happens with union?

Spark version I am using is 1.6.1


Best Regards,
Vikash Pareek



-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Joining streaming data with static table data.

2017-12-11 Thread Vikash Pareek
Hi Satyajit,

For the query/join part there is a couple of approaches.
1. create a dataframe from all incoming streaming batch (i.e. actually an
rdd) and join with your reference data (coming from existing table) 2. you
can use structure streaming that basically consists of the schema in every
batch (you can understand it as a stream of dataframes)

While joining with reference data, if it is static data then load once and
persist it or if it is dynamic data then keep updating this at a regular
interval.


Best Regards,
Vikash Pareek




-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Joining streaming data with static table data.

2017-12-11 Thread Vikash Pareek
Hi Satyajit,

For the query/join part there is a couple of approaches.
1. create a dataframe from all incoming streaming batch (i.e. actually an
rdd) and join with your reference data (coming from existing table)
2. you can use structure streaming that basically consists of schema in
every batch (you can understand it as a stream of dataframes)

While joining with reference data, if it is static data then load once and
persist it or if it is dynamic data then keep updating this at a regular
interval.


Best Regards,
Vikash Pareek



-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to avoid creating meta files (.crc files)

2017-10-09 Thread Vikash Pareek
Hi Users,

Is there any way to avoid creation of .crc files when writing an RDD with
saveAsTextFile method?

My use case is, I have mounted S3 on the local file system using S3FS and
saving an RDD to mounting point. by looking at S3, I found one .crc file for
each part file and even _SUCCESS file.

Thanks in advance.
 



-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark ignores --master local[*]

2017-09-12 Thread Vikash Pareek
Your VM might not be having more than 1 core available to run spark job.
Check with *nproc* command to see how many cores available on VM and *top
*command to see how many cores are free.



-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How does spark work?

2017-09-12 Thread Vikash Pareek
Obviously, you can't store 900GB of data into 80GB memory. 
There is a concept in spark called disk spill, it means when your data size
increases and can't fit into memory then it spilled out to disk.

Also, spark doesn't use whole memory for storing the data, some fraction of
memory used for processing, shuffling and internal data structure too.
For more detail, you can have a look at 
https://0x0fff.com/spark-memory-management/
  

Hope this will help you.






-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unable to save an RDd on S3 with SSE-KMS encryption

2017-09-12 Thread Vikash Pareek
I am trying to save an rdd on S3 with server side encryption using KMS key
(SSE-KMS), But I am getting the following exception:

*Exception in thread "main"
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS
Service: Amazon S3, AWS Request ID: 695E32175EBA568A, AWS Error Code:
InvalidArgument, AWS Error Message: The encryption method specified is not
supported, S3 Extended Request ID:
Pi+HFLg0WsAWtkdI2S/xViOcRPMCi7zdHiaO5n1f7tiwpJe2z0lPY1C2Cr53PnnUCj3358Gx3AQ=*

Following is the piece of my test code to write an rdd on S3 by using
SSE-KMS for encryption:
/val sparkConf = new SparkConf().
  setMaster("local[*]").
  setAppName("aws-encryption")
val sc = new SparkContext(sparkConf)

sc.hadoopConfiguration.set("fs.s3a.access.key", AWS_ACCESS_KEY)
sc.hadoopConfiguration.set("fs.s3a.secret.key", AWS_SECRET_KEY)
sc.hadoopConfiguration.setBoolean("fs.s3a.sse.enabled", true)
sc.hadoopConfiguration.set("fs.s3a.server-side-encryption-algorithm",
"SSE-KMS")
sc.hadoopConfiguration.set("fs.s3a.sse.kms.keyId", KMS_ID)

val s3a = new org.apache.hadoop.fs.s3a.S3AFileSystem
val s3aName = s3a.getClass.getName
sc.hadoopConfiguration.set("fs.s3a.impl", s3aName)

val rdd = sc.parallelize(Seq("one", "two", "three", "four"))
println("rdd is: " + rdd.collect())
rdd.saveAsTextFile(s"s3a://$bucket/$objKey")
/

Although, I am able to write rdd on s3 with AES256 encryption but failing
with SSE-KMS.
Does spark/hadoop have a different value for KMS key encryption instead of
"SSE-KMS" or it doesn't support SSE-KMS encryption on AWS S3?

I found in the official document of hadoop that it only supports AES256 as
of now.
/
  fs.s3n.server-side-encryption-algorithm
  
  Specify a server-side encryption algorithm for S3.
  The default is NULL, and the only other currently allowable value is
AES256.
  
/

Can anyone please suggest what I am missing here or doing wrong?

My environment details as follow:
spark: 1.6.1
hadoop: 2.6.0
aws-java-sdk: 1.7.4

Thank you in advance.




-

__Vikash Pareek
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How does HashPartitioner distribute data in Spark?

2017-06-24 Thread Vikash Pareek
Hi Vadim,

Thank you for your response.

I would like to know how partitioner choose the key, If we look at my
example then following question arises:
1. In case of rdd1, hash partitioning should calculate hashcode of key
(i.e. *"aa"* in this case), so *all records should go to single partition*
instead of uniform distribution?
 2. In case of rdd2, there is no key value pair so how hash partitoning
going to work i.e. *what is the key* to calculate hashcode?



Best Regards,


[image: InfoObjects Inc.] <http://www.infoobjects.com/>
Vikash Pareek
Team Lead  *InfoObjects Inc.*
Big Data Analytics

m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
302004
w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com




On Fri, Jun 23, 2017 at 10:38 PM, Vadim Semenov <vadim.seme...@datadoghq.com
> wrote:

> This is the code that chooses the partition for a key: https://github.com/
> apache/spark/blob/master/core/src/main/scala/org/apache/
> spark/Partitioner.scala#L85-L88
>
> it's basically `math.abs(key.hashCode % numberOfPartitions)`
>
> On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
> vikash.par...@infoobjects.com> wrote:
>
>> I am trying to understand how spark partitoing works.
>>
>> To understand this I have following piece of code on spark 1.6
>>
>> def countByPartition1(rdd: RDD[(String, Int)]) = {
>> rdd.mapPartitions(iter => Iterator(iter.length))
>> }
>> def countByPartition2(rdd: RDD[String]) = {
>> rdd.mapPartitions(iter => Iterator(iter.length))
>> }
>>
>> //RDDs Creation
>> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1),
>> ("aa",
>> 1)), 8)
>> countByPartition(rdd1).collect()
>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>
>> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
>> countByPartition(rdd2).collect()
>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>
>> In both the cases data is distributed uniformaly.
>> I do have following questions on the basis of above observation:
>>
>>  1. In case of rdd1, hash partitioning should calculate hashcode of key
>> (i.e. "aa" in this case), so all records should go to single partition
>> instead of uniform distribution?
>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>> going to work i.e. what is the key to calculate hashcode?
>>
>> I have followed @zero323 answer but not getting answer of these.
>> https://stackoverflow.com/questions/31424396/how-does-hashpa
>> rtitioner-work
>>
>>
>>
>>
>> -
>>
>> __Vikash Pareek
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-does-HashPartitioner-distribute-
>> data-in-Spark-tp28785.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Number Of Partitions in RDD

2017-06-23 Thread Vikash Pareek
Local mode



-

__Vikash Pareek
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-Of-Partitions-in-RDD-tp28730p28786.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How does HashPartitioner distribute data in Spark?

2017-06-23 Thread Vikash Pareek
I am trying to understand how spark partitoing works.

To understand this I have following piece of code on spark 1.6

def countByPartition1(rdd: RDD[(String, Int)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
def countByPartition2(rdd: RDD[String]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}

//RDDs Creation
val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa",
1)), 8)
countByPartition(rdd1).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)

val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
countByPartition(rdd2).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
 
In both the cases data is distributed uniformaly.
I do have following questions on the basis of above observation:

 1. In case of rdd1, hash partitioning should calculate hashcode of key
(i.e. "aa" in this case), so all records should go to single partition
instead of uniform distribution?
 2. In case of rdd2, there is no key value pair so how hash partitoning
going to work i.e. what is the key to calculate hashcode?  

I have followed @zero323 answer but not getting answer of these.
https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work




-

__Vikash Pareek
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-tp28785.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Number Of Partitions in RDD

2017-06-02 Thread Vikash Pareek
Spark 1.6.1



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-Of-Partitions-in-RDD-tp28730p28735.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Number Of Partitions in RDD

2017-06-01 Thread Vikash Pareek
Hi,

I am creating a RDD from a text file by specifying number of partitions. But
it gives me different number of partitions than the specified one.

*/scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 0)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[72] at textFile
at :27

scala> people.getNumPartitions
res47: Int = 1

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 1)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[50] at textFile
at :27

scala> people.getNumPartitions
res36: Int = 1

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 2)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[52] at textFile
at :27

scala> people.getNumPartitions
res37: Int = 2

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 3)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[54] at textFile
at :27

scala> people.getNumPartitions
res38: Int = 3

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 4)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[56] at textFile
at :27

scala> people.getNumPartitions
res39: Int = 4

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 5)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[58] at textFile
at :27

scala> people.getNumPartitions
res40: Int = 6

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 6)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at textFile
at :27

scala> people.getNumPartitions
res41: Int = 7

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 7)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[62] at textFile
at :27

scala> people.getNumPartitions
res42: Int = 8

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 8)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[64] at textFile
at :27

scala> people.getNumPartitions
res43: Int = 9

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 9)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[66] at textFile
at :27

scala> people.getNumPartitions
res44: Int = 11

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 10)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[68] at textFile
at :27

scala> people.getNumPartitions
res45: Int = 11

scala> val people = sc.textFile("file:///home/pvikash/data/test.txt", 11)
people: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[70] at textFile
at :27

scala> people.getNumPartitions
res46: Int = 13/*

Contents of the file /home/pvikash/data/test.txt is:
"
This is a test file.
Will be used for rdd partition
"

I am trying to understand why number of partitions is changing here and in
case we have small data (which can fit into one partition) then why spark
creates empty partitions?

Any explanation would be appreciated.

--Vikash



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-Of-Partitions-in-RDD-tp28730.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Message getting lost in Kafka + Spark Streaming

2017-06-01 Thread Vikash Pareek
Thanks Sidney for your response,

To check if all the messages are processed I used accumulator and also add
a print statement for debuging.


*val accum = ssc.sparkContext.accumulator(0, "Debug Accumulator")*
*...*
*...*
*...*
*val mappedDataStream = dataStream.map(_._2);*
*  mappedDataStream.foreachRDD { rdd =>*
*...*
*...*
*...*
*partition.foreach { row =>*
*  if (debug) println(row.mkString)*
*  val keyedMessage = new KeyedMessage[String,
String](props.getProperty("outTopicUnharmonized"),*
*null, row.toString())*
*  producer.send(keyedMessage)*
*  println("Messges sent to Kafka: " + keyedMessage.message)*
*  accum += 1*
*}*
*//hack, should be done with the flush*
*Thread.sleep(1000)*
*producer.close()*
*print("Accumulator's value is: " + accum)*

And I am getting all the messages in "*println("Messges sent to Kafka: " +
keyedMessage.message)*" received by the stream, and accumulator value is
also same as number of incoming messages.



Best Regards,


[image: InfoObjects Inc.] <http://www.infoobjects.com/>
Vikash Pareek
Team Lead  *InfoObjects Inc.*
Big Data Analytics

m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
302004
w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com




On Thu, Jun 1, 2017 at 11:24 AM, Sidney Feiner <sidney.fei...@startapp.com>
wrote:

> Are you sure that every message gets processed? It could be that some
> messages failed passing the decoder.
> And during the processing, are you maybe putting the events into a map?
> That way, events with the same key could override each other and that way
> you'll have less final events.
>
> -Original Message-
> From: Vikash Pareek [mailto:vikash.par...@infoobjects.com]
> Sent: Tuesday, May 30, 2017 4:00 PM
> To: user@spark.apache.org
> Subject: Message getting lost in Kafka + Spark Streaming
>
> I am facing an issue related to spark streaming with kafka, my use case is
> as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it 2. On the basis of proccessed message, app will
> write proccessed message to different kafka topics for e.g. if messgese is
> harmonized then write to harmonized topic else unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write
> all the messges to output topics(this is expected behaviour) but sometimes
> it writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *  val props = new Properties()
>   props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>   props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>   props.put("group.id", properties.getProperty("group.id"))
>   props.put("serializer.class", "kafka.serializer.StringEncoder")
>   props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>   props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>   props.put("acks", "all");
>   props.put("retries", "5");
>   props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *  val schemaRdd2 = finalHarmonizedDF.toJSON
>
>   schemaRdd2.foreachPartition { partition =>
> val producerConfig = new ProducerConfig(props)
> val producer = new Producer[String, String](producerConfig)
>
> partition.foreach { row =>
>   if (debug) println(row.mkString)
>   val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
> null, row.toString())
>   producer.send(keyedMessage)
>
> }
> //hack, should be done with the flush
> Thread.sleep(1000)
> producer.close()
>   }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-
> Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Message getting lost in Kafka + Spark Streaming

2017-05-30 Thread Vikash Pareek
I am facing an issue related to spark streaming with kafka, my use case is as
follow:
1. Spark streaming(DirectStream) application reading data/messages from
kafka topic and process it
2. On the basis of proccessed message, app will write proccessed message to
different kafka topics
for e.g. if messgese is harmonized then write to harmonized topic else
unharmonized topic
 
the problem is that during the streaming somehow we are lossing some
messaged i.e all the incoming messages are not written to harmonized or
unharmonized topics.
for e.g. if app received 30 messages in one batch then sometime it write all
the messges to output topics(this is expected behaviour) but sometimes it
writes only 27 (3 messages are lost, this number can change).
 
Versions as follow:
Spark 1.6.0
Kafka 0.9
 
Kafka topics confguration is as follow:
# of brokers: 3
# replicxation factor: 3
# of paritions: 3
 
Following are the properties we are using for kafka:
*  val props = new Properties()
  props.put("metadata.broker.list",
properties.getProperty("metadataBrokerList"))
  props.put("auto.offset.reset",
properties.getProperty("autoOffsetReset"))
  props.put("group.id", properties.getProperty("group.id"))
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  props.put("outTopicHarmonized",
properties.getProperty("outletKafkaTopicHarmonized"))
  props.put("outTopicUnharmonized",
properties.getProperty("outletKafkaTopicUnharmonized"))
  props.put("acks", "all");
  props.put("retries", "5");
  props.put("request.required.acks", "-1")
* 
Following is the piece of code where we are writing proccessed messges to
kafka:
*  val schemaRdd2 = finalHarmonizedDF.toJSON
 
  schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
 
partition.foreach { row =>
  if (debug) println(row.mkString)
  val keyedMessage = new KeyedMessage[String,
String](props.getProperty("outTopicHarmonized"),
null, row.toString())
  producer.send(keyedMessage)
 
}
//hack, should be done with the flush
Thread.sleep(1000)
producer.close()
  }
* 
We explicitely added sleep(1000) for testing purpose.
But this is also not solving the problem :(
 
Any suggestion would be appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive on Spark is not populating correct records

2017-05-04 Thread Vikash Pareek
After lots of expermiments, I have figured out that it was a potential bug in
cloudera with Hive on Spark.
Hive on Spark does not populate consistent output on aggregate functions.

Hopefully, it will be fixed in next relaese.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-on-Spark-is-not-populating-correct-records-tp28128p28650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Setting spark.yarn.stagingDir in 1.6

2017-03-15 Thread Vikash Pareek
++ Sudhir

On Wed, 15 Mar 2017 at 4:06 PM, Saurav Sinha <sauravsinh...@gmail.com>
wrote:

> Hi Users,
>
>
> I am running spark job in yarn.
>
> I want to set staging directory to some other location which is by default
> hdfs://host:port/home/$User/
>
> In spark 2.0.0, it can be done by setting spark.yarn.stagingDir.
>
> But in production, we have spark 1.6. Can anyone please suggest how it can
> be done in spark 1.6.
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>
-- 

Best Regards,


[image: InfoObjects Inc.] <http://www.infoobjects.com/>
Vikash Pareek
Team Lead  *InfoObjects Inc.*
Big Data Analytics

m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
302004
w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com


Hive on Spark is not populating correct records

2016-11-24 Thread Vikash Pareek
Hi,

Not sure whether it is right place to discuss this issue.

I am running following Hive query multiple times with execution engine as
Hive on Spark and Hive on MapReduce.

With Hive on Spark: Result (count) were different of every execution.
With Hive on MapReduce: Result (count) were same of every execution.

Seems like Hive on Spark behaving differently in each execution and does not
populating correct result.

Volume of data as follow:
my_table1 (left): 30 million records
my_table2 (right): 85 million records

-- Thanks
Vikash




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hive-on-Spark-is-not-populating-correct-records-tp28128.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: When queried through hiveContext, does hive executes these queries using its execution engine (default is map-reduce), or spark just reads the data and performs those queries itself?

2016-06-08 Thread Vikash Pareek
Himanshu,

Spark doesn't use hive execution engine (Map Reduce) to execute query. Spark
only reads the meta data from hive meta store db and executes the query
within Spark execution engine. This meta data is used by Spark's own SQL
execution engine (this includes components such as catalyst, tungsten to
optimize queries) to execute query and generate result faster than hive (Map
Reduce).

Using HiveContext means connecting to hive meta store db. Thus, HiveContext
can access hive meta data, and hive meta data includes location of data,
serialization and de-serializations, compression codecs, columns, datatypes
etc. thus, Spark have enough information about the hive tables and it's data
to understand the target data and execute the query over its on execution
engine.

Overall, Spark replaced the Map Reduce model completely by it's
in-memory(RDD) computation engine.

- Vikash Pareek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/When-queried-through-hiveContext-does-hive-executes-these-queries-using-its-execution-engine-default-tp27114p27117.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Number of executors change during job running

2016-05-02 Thread Vikash Pareek
Hi Bill,

You can try DirectStream and increase # of partition to kafka. then input
Dstream will have the partitions as per kafka topic without using
re-partitioning.

Can you please share your event timeline chart from spark ui. You need to
tune your configuration as per computation. Spark ui will give deeper
understanding of the problem.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-change-during-job-running-tp9243p26866.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Number of executors in spark-1.6 and spark-1.5

2016-04-10 Thread Vikash Pareek
Hi Talebzadeh,

Thank for your quick response.

>>in 1.6, how many executors do you see for each node?
I have1 executor for 1 node with SPARK_WORKER_INSTANCES=1.

>>in standalone mode how are you increasing the number of worker instances.
Are you starting another slave on each node?
No, I am not starting another slave node, I just changed *spark-env.sh *for
each slave node i.e. set SPARK_WORKER_INSTANCES=2.





Best Regards,


Vikash Pareek
Software Developer, *InfoObjects Inc.*
m: +918800206898 a: E5, Jhalana Institutional Area, Jaipur
s: vikaspareek1991 e: vikash.par...@infoobjects.com



On Sun, Apr 10, 2016 at 3:00 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> in 1.6, how many executors do you see for each node?
> in standalone mode how are you increasing the number of worker instances.
> Are you starting another slave on each node?
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 10 April 2016 at 08:26, Vikash Pareek <vikash.par...@infoobjects.com>
> wrote:
>
>> Hi,
>>
>> I have upgraded 5 node spark cluster from spark-1.5 to spark-1.6 (to use
>> mapWithState function).
>> After using spark-1.6, I am getting a strange behaviour of spark, jobs are
>> not using multiple executors of different nodes at a time means there is
>> no
>> parallel processing if each node having single worker and executor.
>> I am running jobs in spark standalone mode.
>>
>> I observed following points related to this issue.
>> 1. If I run same job with spark-1.5 then this will use multiple executors
>> across different nodes at a time.
>> 2. In Spark-1.6, If I increase no of cores(spark.cores.max) then jobs are
>> running in parallel thread but within same executor.
>> 3. In Spark-1.6, If I increase no of worker instances on each node then
>> jobs
>> are running in parallel as no of workers but within same executor.
>>
>> Can anyone suggest, why spark 1.6 can not use multiple executors across
>> different node at a time for parallel processing.
>> Your suggestion will be highly appreciated.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-in-spark-1-6-and-spark-1-5-tp26733.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Number of executors in spark-1.6 and spark-1.5

2016-04-10 Thread Vikash Pareek
Hi,

I have upgraded 5 node spark cluster from spark-1.5 to spark-1.6 (to use
mapWithState function).
After using spark-1.6, I am getting a strange behaviour of spark, jobs are
not using multiple executors of different nodes at a time means there is no
parallel processing if each node having single worker and executor.
I am running jobs in spark standalone mode.

I observed following points related to this issue.
1. If I run same job with spark-1.5 then this will use multiple executors
across different nodes at a time.
2. In Spark-1.6, If I increase no of cores(spark.cores.max) then jobs are
running in parallel thread but within same executor.
3. In Spark-1.6, If I increase no of worker instances on each node then jobs
are running in parallel as no of workers but within same executor.

Can anyone suggest, why spark 1.6 can not use multiple executors across
different node at a time for parallel processing.
Your suggestion will be highly appreciated.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Number-of-executors-in-spark-1-6-and-spark-1-5-tp26733.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



StackOverflow in updateStateByKey

2016-03-28 Thread Vikash Pareek
Hi,

In my use case I need to maintain history data for a key. For this I am
using updateStateByKey in which state is maintained as mutable scala
collection(ArrayBuffer). Each element in ArrayBuffer is an incoming record.
Spark version is 1.6

As number of elements(records) increases in the ArrayBuffer for a key I am
getting StackOverflow error.
16/03/28 07:31:55 ERROR scheduler.JobScheduler: Error running job streaming
job 1459150304000 ms.2
java.lang.StackOverflowError
at
scala.collection.immutable.StringOps.stripSuffix(StringOps.scala:31)
at org.apache.spark.Logging$class.logName(Logging.scala:44)
at org.apache.spark.rdd.RDD.logName(RDD.scala:74)
at org.apache.spark.Logging$class.log(Logging.scala:51)
at org.apache.spark.rdd.RDD.log(RDD.scala:74)
at org.apache.spark.Logging$class.logDebug(Logging.scala:62)
at org.apache.spark.rdd.RDD.logDebug(RDD.scala:74)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getDependencies$1.apply(CoGroupedRDD.scala:104)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getDependencies$1.apply(CoGroupedRDD.scala:99)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.rdd.CoGroupedRDD.getDependencies(CoGroupedRDD.scala:99)
at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:226)
at
org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:224)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:224)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:117)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:115)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1.apply$mcVI$sp(CoGroupedRDD.scala:115)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
org.apache.spark.rdd.CoGroupedRDD.getPartitions(CoGroupedRDD.scala:113)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:121)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$getPartitions$1$$anonfun$apply$mcVI$sp$1.apply(CoGroupedRDD.scala:115)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

Following is the code snippet
 /def updateState(rows: Seq[ArrayBuffer[Row]], state:
Option[ArrayBuffer[Row]]) = {

  val prevState = state.getOrElse[ArrayBuffer[Row]](ArrayBuffer[Row]())

  val newState = ArrayBuffer.empty[Row]
  newState ++= prevState
  for (r <- rows) {
newState += r(0)
  }
  Some(newState)
}

val pairedFaultStream = getPairedStream(faultStream, sqlContext)
val workingStream =
pairedFaultStream.updateStateByKey[ArrayBuffer[Row]](updateState
_).map(_._2)/

I have tried following approaches
1. truncating lineage by caching and checkpointing rdd of *workingStream*.
2. using kryo serialization

Any suggestion will be appreciated.

- Thanks
Vikash 




--
View this message in context: