Spark Streaming with Confluent

2017-12-13 Thread Arkadiusz Bicz
Hi,

I try to test spark streaming 2.2.0 version with confluent 3.3.0

I have got lot of error during compilation this is my sbt:

lazy val sparkstreaming = (project in file("."))
  .settings(
  name := "sparkstreaming",
  organization := "org.arek",
  version := "0.1-SNAPSHOT",
  scalaVersion := "2.11.8",
libraryDependencies ++=  Seq(
  "org.apache.spark" %% "spark-streaming" % "2.2.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0",
  "io.confluent" % "kafka-avro-serializer" % "3.3.0"
)
  )


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
import io.confluent.kafka.serializers.KafkaAvroDecoder

object Transformation extends Serializable {

  def main(args: Array[String]) = {
val conf = new
SparkConf().setAppName("StreamingTranformation").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(1))

val kafkaParams = Map[String, String]("metadata.broker.list" ->
"local:2181",
  "schema.registry.url" -> "http://local:8081;,
  "auto.offset.reset" -> "smallest")

val topicSet = Set("GEXPPROD_ROUTE")
val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)

val lines = messages.foreachRDD(rdd => {
  rdd.foreach({ avroRecord =>
println(avroRecord)
  })
})
  }


[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
3.7.0.Final}
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 (depends
on 3.7.0.Final)
[warn]  +- org.apache.zookeeper:zookeeper:3.4.8   (depends
on 3.7.0.Final)
[warn]  +- org.apache.zookeeper:zookeeper:3.4.6   (depends
on 3.6.2.Final)
[warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends
on 3.6.2.Final)
[warn]  * commons-net:commons-net:2.2 is selected over 3.1
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 (depends
on 3.1)
[warn]  +- org.apache.hadoop:hadoop-common:2.6.5  (depends
on 3.1)
[warn]  * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
[warn]  +- org.apache.hadoop:hadoop-yarn-client:2.6.5 (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-api:2.6.5(depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-common:2.6.5 (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5
(depends on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-common:2.6.5  (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5  (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends
on 11.0.2)
[warn]  +- org.apache.curator:curator-framework:2.6.0 (depends
on 16.0.1)
[warn]  +- org.apache.curator:curator-client:2.6.0(depends
on 16.0.1)
[warn]  +- org.apache.curator:curator-recipes:2.6.0   (depends
on 16.0.1)
[warn]  +- org.htrace:htrace-core:3.0.4   (depends
on 12.0.1)
[warn] Run 'evicted' to see detailed eviction warnings
[info] Compiling 1 Scala source to
/home/adminuser/data-streaming-platform/sparkstreaming/target/scala-2.11/classes
...
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11:
object confluent is not a member of package org.apache.spark.io
[error] import io.confluent.kafka.serializers.KafkaAvroDecoder
[error]   ^
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66:
not found: type KafkaAvroDecoder
[error] val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)
[error]  ^
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84:
not found: type KafkaAvroDecoder
[error] val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)
[error]


When changing to library  "org.apache.spark" %%
"spark-streaming-kafka-0-10" % "2.2.0" :


[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]  * com.101tec:zkclient:0.10 is selected over 0.8
[warn]  +- io.confluent:common-utils:3.3.0(depends
on 0.10)
[warn]  +- org.apache.kafka:kafka_2.11:0.10.0.1   (depends
on 0.8)
[warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
3.7.0.Final}
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 

Re: is it ok to have multiple sparksession's in one spark structured streaming app?

2017-09-08 Thread Arkadiusz Bicz
You don't need multiple spark sessions to have more than one stream
working, but from maintenance and reliability perspective it is not good
idea.

On Thu, Sep 7, 2017 at 2:40 AM, kant kodali  wrote:

> Hi All,
>
> I am wondering if it is ok to have multiple sparksession's in one spark
> structured streaming app? Basically, I want to create 1) Spark session for
> reading from Kafka and 2) Another Spark session for storing the mutations
> of a dataframe/dataset to a persistent table as I get the mutations from
> #1?
>
> Finally, is this a common practice?
>
> Thanks,
> kant
>


Re: Cloudera 5.8.0 and spark 2.1.1

2017-05-17 Thread Arkadiusz Bicz
It is working fine,  but it is not supported by Cloudera.



On May 17, 2017 1:30 PM, "issues solution" 
wrote:

> Hi ,
>  it s possible to use prebuilt version of spark 2.1  inside cloudera 5.8
> where scala 2.1.0 not scala 2.1.1 and java 1.7 not java 1.8
>
> Why ?
>
> i am in corporate area and i want to test last version  of spark.
> but my probleme i dont Know if the version 2.1.1 of spark can or not work
> with this version of cloudera . i mean prebuilt version not source i don't
> have admin rights
>
> actual version of myspark it 1.6  and scala 2.1.0 and Hadoop
> 2.6.0-cdh5.8.0 and Hive 1.1.0-cdh5.8.0
>
> thx a lot
>
>
>


Support for decimal separator (comma or period) in spark 2.1

2017-02-23 Thread Arkadiusz Bicz
Hi Team,

I would like to know if it is possible to specify decimal localization for
DataFrameReader for  csv?

I have cvs files from localization where decimal separator is comma like
0,32 instead of US way like 0.32

Is it a way to specify in current version of spark to provide localization:

spark.read.option("sep",";").option("header", "true").option("inferSchema",
"true").format("csv").load("nonuslocalized.csv")

If not should I create ticket in jira for this ? I can work on solution if
not available.

Best Regards,

Arkadiusz Bicz


Re: Processing millions of messages in milliseconds -- Architecture guide required

2016-04-19 Thread Arkadiusz Bicz
Sorry, I've found one error:

If you do NOT need any relational processing of your messages ( basing on
historical data, or joining with other messages) and message
processing is quite independent Kafka plus Spark Streaming could be
overkill.

On Tue, Apr 19, 2016 at 1:54 PM, Arkadiusz Bicz
<arkadiusz.b...@gmail.com> wrote:
> Requirements looks like my previous project for smart metering. We
> finally did custom solution without Spark, Hadoop and Kafka but it was
> 4 years ago when I did not have experience with this technologies (
> some not existed or were not mature).
>
> If you do need any relational processing of your messages ( basing on
> historical data, or joining with other messages) and message
> processing is quite independent Kafka plus Spark Streaming could be
> overkill.
>
> The best to check if your data has natural index like timestamp in
> metering data which come in the same frequency (every second) and
> basing on it do access to your cache and disc. For cache for me  most
> promising looks  Alluxio.
>
> BR,
> Arkadiusz Bicz
>
> On Tue, Apr 19, 2016 at 6:01 AM, Deepak Sharma <deepakmc...@gmail.com> wrote:
>> Hi all,
>> I am looking for an architecture to ingest 10 mils of messages in the micro
>> batches of seconds.
>> If anyone has worked on similar kind of architecture  , can you please point
>> me to any documentation around the same like what should be the architecture
>> , which all components/big data ecosystem tools should i consider etc.
>> The messages has to be in xml/json format , a preprocessor engine or message
>> enhancer and then finally a processor.
>> I thought about using data cache as well for serving the data
>> The data cache should have the capability to serve the historical  data in
>> milliseconds (may be upto 30 days of data)
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Processing millions of messages in milliseconds -- Architecture guide required

2016-04-19 Thread Arkadiusz Bicz
Requirements looks like my previous project for smart metering. We
finally did custom solution without Spark, Hadoop and Kafka but it was
4 years ago when I did not have experience with this technologies (
some not existed or were not mature).

If you do need any relational processing of your messages ( basing on
historical data, or joining with other messages) and message
processing is quite independent Kafka plus Spark Streaming could be
overkill.

The best to check if your data has natural index like timestamp in
metering data which come in the same frequency (every second) and
basing on it do access to your cache and disc. For cache for me  most
promising looks  Alluxio.

BR,
Arkadiusz Bicz

On Tue, Apr 19, 2016 at 6:01 AM, Deepak Sharma <deepakmc...@gmail.com> wrote:
> Hi all,
> I am looking for an architecture to ingest 10 mils of messages in the micro
> batches of seconds.
> If anyone has worked on similar kind of architecture  , can you please point
> me to any documentation around the same like what should be the architecture
> , which all components/big data ecosystem tools should i consider etc.
> The messages has to be in xml/json format , a preprocessor engine or message
> enhancer and then finally a processor.
> I thought about using data cache as well for serving the data
> The data cache should have the capability to serve the historical  data in
> milliseconds (may be upto 30 days of data)
> --
> Thanks
> Deepak
> www.bigdatabig.com
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



YARN vs Standalone Spark Usage in production

2016-04-14 Thread Arkadiusz Bicz
Hello,

Is there any statistics regarding YARN vs Standalone Spark Usage in
production ?

I would like to choose most supported and used technology in
production for our project.


BR,

Arkadiusz Bicz

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to update data saved as parquet in hdfs using Dataframes

2016-02-17 Thread Arkadiusz Bicz
Hi,

Hdfs is append only, that you need to modify it as you read and write
in other place.

On Wed, Feb 17, 2016 at 2:45 AM, SRK  wrote:
> Hi,
>
> How do I update data saved as Parquet in hdfs using dataframes? If I use
> SaveMode.Append, it just seems to append the data but does not seem to
> update if the record is already existing. Do I have to just modify it using
> Dataframes api or sql using sqlContext?
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-update-data-saved-as-parquet-in-hdfs-using-Dataframes-tp26245.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory problems and missing heartbeats

2016-02-16 Thread Arkadiusz Bicz
I had similar as #2 problem when I used lot of caching and then doing
shuffling It looks like when I cached too much there was no enough
space for other spark tasks and it just hang on.

That you can try to cache less and see if improve, also executor logs
help a lot (watch out logs with information about spill) you can also
monitor jobs jvms through spark monitoring
http://spark.apache.org/docs/latest/monitoring.html and Graphite and
Grafana.

On Tue, Feb 16, 2016 at 2:14 PM, Iulian Dragoș
 wrote:
> Regarding your 2nd problem, my best guess is that you’re seeing GC pauses.
> It’s not unusual, given you’re using 40GB heaps. See for instance this blog
> post
>
> From conducting numerous tests, we have concluded that unless you are
> utilizing some off-heap technology (e.g. GridGain OffHeap), no Garbage
> Collector provided with JDK will render any kind of stable GC performance
> with heap sizes larger that 16GB. For example, on 50GB heaps we can often
> encounter up to 5 minute GC pauses, with average pauses of 2 to 4 seconds.
>
> Not sure if Yarn can do this, but I would try to run with a smaller executor
> heap, and more executors per node.
>
> iulian
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: best practices? spark streaming writing output detecting disk full error

2016-02-12 Thread Arkadiusz Bicz
Hi,

You need good monitoring tools to send you alarms about disk, network
or  applications errors, but I think it is general dev ops work not
very specific to spark or hadoop.

BR,

Arkadiusz Bicz
https://www.linkedin.com/in/arkadiuszbicz

On Thu, Feb 11, 2016 at 7:09 PM, Andy Davidson
<a...@santacruzintegration.com> wrote:
> We recently started a Spark/Spark Streaming POC. We wrote a simple streaming
> app in java to collect tweets. We choose twitter because we new we get a lot
> of data and probably lots of burst. Good for stress testing
>
> We spun up  a couple of small clusters using the spark-ec2 script. In one
> cluster we wrote all the tweets to HDFS in a second cluster we write all the
> tweets to S3
>
> We were surprised that our HDFS file system reached 100 % of capacity in a
> few days. This resulted with “all data nodes dead”. We where surprised
> because the actually stream app continued to run. We had no idea we had a
> problem until a day or two after the disk became full when we noticed we
> where missing a lot of data.
>
> We ran into a similar problem with our s3 cluster. We had a permission
> problem and where un able to write any data yet our stream app continued to
> run
>
>
> Spark generated mountains of logs,We are using the stand alone cluster
> manager. All the log levels wind up in the “error” log. Making it hard to
> find real errors and warnings using the web UI. Our app is written in Java
> so my guess is the write errors must be unable. I.E. We did not know in
> advance that they could occur . They are basically undocumented.
>
>
>
> We are a small shop. Running something like splunk would add a lot of
> expense and complexity for us at this stage of our growth.
>
> What are best practices
>
> Kind Regards
>
> Andy

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to parallel read files in a directory

2016-02-12 Thread Arkadiusz Bicz
Hi Junjie,

>From my experience HDFS is slow reading large amount of small files as
every file come with lot of information from namenode and data nodes.
When file size is bellow HDFS default block (usually 64MB or 128MB)
size you can not use fully optimizations of Hadoop to read  in
streamed way lot of data.

Also when using DataFrames there is huge overhead by caching files
information as described in
https://issues.apache.org/jira/browse/SPARK-11441

BR,
Arkadiusz Bicz
https://www.linkedin.com/in/arkadiuszbicz

On Thu, Feb 11, 2016 at 7:24 PM, Jakob Odersky <ja...@odersky.com> wrote:
> Hi Junjie,
>
> How do you access the files currently? Have you considered using hdfs? It's
> designed to be distributed across a cluster and Spark has built-in support.
>
> Best,
> --Jakob
>
> On Feb 11, 2016 9:33 AM, "Junjie Qian" <qian.jun...@outlook.com> wrote:
>>
>> Hi all,
>>
>> I am working with Spark 1.6, scala and have a big dataset divided into
>> several small files.
>>
>> My question is: right now the read operation takes really long time and
>> often has RDD warnings. Is there a way I can read the files in parallel,
>> that all nodes or workers read the file at the same time?
>>
>> Many thanks
>> Junjie

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: best practices? spark streaming writing output detecting disk full error

2016-02-12 Thread Arkadiusz Bicz
Hi Andy,

I suggest to monitor disk usage and in case it is 90% occupation send
alarm to your support team to solve problem, you should not allow your
production system to go down.

Regarding tools you can try set of software as collectd and Spark ->
Graphite -> Grafana -> https://github.com/pabloa/grafana-alerts. I
have not used grafana-alerts but looks promising.

BR,

Arkadiusz Bicz


On Fri, Feb 12, 2016 at 4:38 PM, Andy Davidson
<a...@santacruzintegration.com> wrote:
> Hi Arkadiusz
>
> Do you have any suggestions?
>
> As an engineer I think when I get disk full errors I want the application to
> terminate. Its a lot easier for ops to really there is a problem.
>
>
> Andy
>
>
> From: Arkadiusz Bicz <arkadiusz.b...@gmail.com>
> Date: Friday, February 12, 2016 at 1:57 AM
> To: Andrew Davidson <a...@santacruzintegration.com>
> Cc: "user @spark" <user@spark.apache.org>
> Subject: Re: best practices? spark streaming writing output detecting disk
> full error
>
> Hi,
>
> You need good monitoring tools to send you alarms about disk, network
> or  applications errors, but I think it is general dev ops work not
> very specific to spark or hadoop.
>
> BR,
>
> Arkadiusz Bicz
> https://www.linkedin.com/in/arkadiuszbicz
>
> On Thu, Feb 11, 2016 at 7:09 PM, Andy Davidson
> <a...@santacruzintegration.com> wrote:
>
> We recently started a Spark/Spark Streaming POC. We wrote a simple streaming
> app in java to collect tweets. We choose twitter because we new we get a lot
> of data and probably lots of burst. Good for stress testing
>
> We spun up  a couple of small clusters using the spark-ec2 script. In one
> cluster we wrote all the tweets to HDFS in a second cluster we write all the
> tweets to S3
>
> We were surprised that our HDFS file system reached 100 % of capacity in a
> few days. This resulted with “all data nodes dead”. We where surprised
> because the actually stream app continued to run. We had no idea we had a
> problem until a day or two after the disk became full when we noticed we
> where missing a lot of data.
>
> We ran into a similar problem with our s3 cluster. We had a permission
> problem and where un able to write any data yet our stream app continued to
> run
>
>
> Spark generated mountains of logs,We are using the stand alone cluster
> manager. All the log levels wind up in the “error” log. Making it hard to
> find real errors and warnings using the web UI. Our app is written in Java
> so my guess is the write errors must be unable. I.E. We did not know in
> advance that they could occur . They are basically undocumented.
>
>
>
> We are a small shop. Running something like splunk would add a lot of
> expense and complexity for us at this stage of our growth.
>
> What are best practices
>
> Kind Regards
>
> Andy
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Generic Dataset Aggregator

2016-01-26 Thread Arkadiusz Bicz
Hi Deenar,

You just need to encapsulate Array in Case Class ( you can not define
case class inside spark shell as it can not be inner class)

import com.hsbc.rsl.spark.aggregation.MinVectorAggFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.TypedColumn

case class ResultSmallA(tradeId: String, tradeVersion: String, values:
Array[Double])

class AggregateResults extends Aggregator[ResultSmallA, ResultSmallA,
ResultSmallA] with Serializable {


  def zero: ResultSmallA = ResultSmallA("", "", Array[Double](0))

  // The initial value.
  def reduce(b: ResultSmallA, a: ResultSmallA) =
ResultSmallA(b.tradeId, b.tradeVersion, min.mergeArrays(a.values,
b.values))

  // Add an element to the running total
  def merge(b: ResultSmallA, a: ResultSmallA) =
ResultSmallA(b.tradeId, b.tradeVersion,  (a.values,
b.values).zipped.map { case (a, b) => a+ b }

  // Merge intermediate values.
  def finish(b: ResultSmallA) = b
}

def sumRes : TypedColumn[ResultSmallA, ResultSmallA] = new
AggregateResults().toColumn

import sqlContext.implicits._
val dsResults = Seq(ResultSmallA("1", "1", Array[Double](1.0,2.0)),
ResultSmallA("1", "1", Array[Double](1.0,2.0)) ).toDS()
dsResults.groupBy(_.tradeId).agg(sumRes)

Best Regards,

Arkadiusz Bicz
https://uk.linkedin.com/in/arkadiuszbicz

On Mon, Jan 25, 2016 at 10:36 PM, Deenar Toraskar
<deenar.toras...@gmail.com> wrote:
> Hi All
>
> https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html
>
> I have been converting my UDAFs to Dataset (Dataset's are cool BTW)
> Aggregators. I have an ArraySum aggregator that does an element wise sum or
> arrays. I have got the simple version working, but the Generic version fails
> with the following error, not sure what I am doing wrong.
>
> scala> import sqlContext.implicits._
>
> scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N]
> = new GenericArraySumAggregator(f).toColumn
>
> :34: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing sqlContext.implicits._  Support for serializing other
> types will be added in future releases.
>
>  def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I,
> N] = new GenericArraySumAggregator(f).toColumn
>
>
> ^
>
> object ArraySumAggregator extends  Aggregator[Seq[Float], Seq[Float],
> Seq[Float]] with Serializable {
>   def zero: Seq[Float] = Nil
>   // The initial value.
>   def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) =
> sumArray(currentSum, currentRow)
>   def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row)
>   def finish(b: Seq[Float]) = b // Return the final result.
>   def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = {
> (a, b) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (a, b).zipped.map { case (a, b) => a + b }
> }
>   }
> }
>
> class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends
> Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable {
>   val numeric = implicitly[Numeric[N]]
>   override def zero: Seq[N] = Nil
>   override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b, a.map( x
> => f(x))) //numeric.plus(b, f(a))
>   override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2)
>   override def finish(reduction: Seq[N]): Seq[N] = reduction
>   def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = {
> (a, b) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a,
> b) }
> }
>   }
> }
>
> Regards
>
> Deenar
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: best practice : how to manage your Spark cluster ?

2016-01-21 Thread Arkadiusz Bicz
Hi Charles,

We are using Ambari for hadoop / spark services management, version
and monitoring in cluster.

For Spark jobs and cluster hosts, discs, memory, cpu, network realtime
monitoring we use graphite + grafana + collectd + spark metrics

http://www.hammerlab.org/2015/02/27/monitoring-spark-with-graphite-and-grafana/

BR,

Arkadiusz Bicz

On Thu, Jan 21, 2016 at 5:33 AM, charles li <charles.up...@gmail.com> wrote:
> I've put a thread before:  pre-install 3-party Python package on spark
> cluster
>
> currently I use Fabric to manage my cluster , but it's not enough for me,
> and I believe there is a much better way to manage and monitor the cluster.
>
> I believe there really exists some open source manage tools which provides a
> web UI allowing me to [ what I need exactly ]:
>
> monitor the cluster machine's state in real-time, say memory, network, disk
> list all the services, packages on each machine
> install / uninstall / upgrade / downgrade package through a web UI
> start / stop / restart services on that machine
>
>
>
> great thanks
>
> --
> --
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame partitionBy to a single Parquet file (per partition)

2016-01-15 Thread Arkadiusz Bicz
Why do you need to be only one file? Spark doing good job writing in
many files.

On Fri, Jan 15, 2016 at 7:48 AM, Patrick McGloin
 wrote:
> Hi,
>
> I would like to reparation / coalesce my data so that it is saved into one
> Parquet file per partition. I would also like to use the Spark SQL
> partitionBy API. So I could do that like this:
>
> df.coalesce(1).write.partitionBy("entity", "year", "month", "day",
> "status").mode(SaveMode.Append).parquet(s"$location")
>
> I've tested this and it doesn't seem to perform well. This is because there
> is only one partition to work on in the dataset and all the partitioning,
> compression and saving of files has to be done by one CPU core.
>
> I could rewrite this to do the partitioning manually (using filter with the
> distinct partition values for example) before calling coalesce.
>
> But is there a better way to do this using the standard Spark SQL API?
>
> Best regards,
>
> Patrick
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-14 Thread Arkadiusz Bicz
Hi

What is the proper configuration for saving parquet partition with
large number of repeated keys?

On bellow code I load 500 milion rows of data and partition it on
column with not so many different values.

Using spark-shell with 30g per executor and driver and 3 executor cores

sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")


Job failed because not enough memory in executor :

WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
used. Consider boosting spark.yarn.executor.memoryOverhead.
16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
datanode2.babar.poc: Container killed by YARN for exceeding memory
limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to make Dataset api as fast as DataFrame

2016-01-13 Thread Arkadiusz Bicz
Hi,

Including query plan :
DataFrame :

== Physical Plan ==
SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Final,isDistinct=false)],
output=[agreement#23,maxvalues#27])
+- ConvertToSafe
   +- Sort [agreement#23 ASC], false, 0
  +- TungstenExchange hashpartitioning(agreement#23,48), None
 +- ConvertToUnsafe
+- SortBasedAggregate(key=[agreement#23],
functions=[(MaxVectorAggFunction(values#3),mode=Partial,isDistinct=false)],
output=[agreement#23,values#26])
   +- ConvertToSafe
  +- Sort [agreement#23 ASC], false, 0
 +- Project [agreement#23,values#3]
+- BroadcastHashJoin
[tradeId#0,tradeVersion#1], [tradeId#4,tradeVersion#5], BuildRight
   :- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-0-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-1-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-2-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-3-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,values#3]
   +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
  +- TungstenExchange
hashpartitioning(tradeId#4,tradeVersion#5,agreement#23,48), None
 +-
TungstenAggregate(key=[tradeId#4,tradeVersion#5,agreement#23],
functions=[], output=[tradeId#4,tradeVersion#5,agreement#23])
+- Scan ParquetRelation[hdfs://



//1. MapGrouped
== Physical Plan ==
!MapGroups , class[tradeId[0]: string, tradeVersion[0]:
string, agreement[0]: string], class[_1[0]:
struct,
_2[0]: struct],
class[_1[0]: string, _2[0]: string, _3[0]: string, _4[0]:
array], [tradeId#79,tradeVersion#80,agreement#81],
[_1#88,_2#89,_3#90,_4#91]
+- ConvertToSafe
   +- Sort [tradeId#79 ASC,tradeVersion#80 ASC,agreement#81 ASC], false, 0
  +- TungstenExchange
hashpartitioning(tradeId#79,tradeVersion#80,agreement#81,48), None
 +- ConvertToUnsafe
+- !AppendColumns , class[_1[0]:
struct,
_2[0]: struct],
class[tradeId[0]: string, tradeVersion[0]: string, agreement[0]:
string], [tradeId#79,tradeVersion#80,agreement#81]
   +- Project
[struct(tradeId#38,tradeVersion#39,values#40) AS
_1#73,struct(tradeId#67,tradeVersion#68,agreement#69) AS _2#74]
  +- BroadcastHashJoin [tradeId#38,tradeVersion#39],
[tradeId#67,tradeVersion#68], BuildRight
 :- ConvertToUnsafe
 :  +- !MapPartitions ,
class[tradeId[0]: string, tradeVersion[0]: string, resultType[0]: int,
values[0]: array], class[tradeId[0]: string, tradeVersion[0]:
string, values[0]: array],
[tradeId#38,tradeVersion#39,values#40]
 : +- ConvertToSafe
 :+- Scan
ParquetRelation[hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_SUCCESS,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_common_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/_metadata,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-0-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-1-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-2-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet,hdfs://namenode1.babar.poc:8020/tmp/arekresultspart1/part-r-3-1e0f161d-2082-4a5a-a8a0-7b76fd1a80bc.gz.parquet]
PushedFilter: [] [tradeId#0,tradeVersion#1,resultType#2,values#3]
 +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
+- TungstenExchange
hashpartitioning(tradeId#67,tradeVersion#68,agreement#69,48), None
   +-
TungstenAggregate(key=[tradeId#67,tradeVersion#68,agreement#69],
functions=[], output=[tradeId#67,tradeVersion#68,agreement#69])
  +- !MapPartitions ,
class[tradeId[0]: string, tradeVersion[0]: string, tradeType[0]:
string, notional[0]: decimal(38,18), currency[0]: string, asset[0]:
string, trader[0]: string, productCode[0]: string, counterParty[0]:
string, counterPartyAccronym[0]: string, 

How to make Dataset api as fast as DataFrame

2016-01-13 Thread Arkadiusz Bicz
Hi,

I have done some performance tests by repeating execution with
different number of  executors and memory for YARN  clustered Spark
(version 1.6.0)  ( cluster contains 6 large size nodes)

I found Dataset joinWith or cogroup from 3 to 5 times slower then
broadcast join in DataFrame, how to make it at least similar fast ?

Examples of my code :

DataFrame :
// 500 milion rows
val r = results.select("tradeId", "tradeVersion", "values").as("r")
// 100 thousand rows
val t = trades.select("tradeId", "tradeVersion", "agreement").distinct.as("t")

val j = r.join(broadcast(t), r("tradeId") === t("tradeId") &&
r("tradeVersion") === t("tradeVersion"))
val s = j.select(r("tradeId"), t("tradeVersion"), t("agreement"), r("values"))
val g = s.groupBy(t("agreement"))

val maxvec = new MaxVectorAggFunction
val agg = g.agg(maxvec(r("values")).as("maxvalues"))
agg.write.parquet("hdfs:.../tmp/somelocation")

DataSet

case class ResultsA(tradeId: String, tradeVersion: String, resultType:
Int, values: Array[Double])

case class TradesA(tradeId: String, tradeVersion: String, tradeType:
String, notional: BigDecimal, currency: String,
  asset: String, trader: String, productCode:
String, counterParty: String, counterPartyAccronym: String,
  tradeStatus: String, portfolio: String,
internalPortfolio: String, ptsBook: String, validFrom: String,
  validTill: String, tradeDate: String, maturity:
String, buySellIndicator: String, agreement: String)

case class ResultSmallA(tradeId: String, tradeVersion: String, values:
Array[Double])
case class ResultAgreementA(tradeId: String, tradeVersion: String,
agreement: String, values: Array[Double])
case class TradeSmallA(tradeId: String, tradeVersion: String, agreement: String)

lazy val dsresults = results.as[ResultsA].map(r =>
ResultSmallA(r.tradeId, r.tradeVersion, r.values)).as("r")
lazy val dstrades = trades.as[TradesA].map(t => TradeSmallA(t.tradeId,
t.tradeVersion, t.agreement)).distinct.as("t")
lazy val j = dsresults.joinWith(dstrades, $"r.tradeId" ===
$"t.tradeId" && $"r.tradeVersion" === $"t.tradeVersion", "inner")

//1. MapGrouped

val group = j.groupBy { v => v match {
case (r: ResultSmallA, t: TradeSmallA) => t
  }
}

val reduced = group.mapGroups { case (t, iter) => (t.tradeId,
t.tradeVersion, t.agreement,
  iter.map { case (r, t) => r.values }.reduce((l, r) => {
val min = new MinVectorAggFunction(); min.mergeArrays(l, r)
  }))
}

//2. Reduce

val group2 = j.groupBy(_._2)

val reduced2 = group2.reduce((i1, i2) => {
  val r1 = i1._1
  val r2 = i2._1
  import r1._
  val min = new MinVectorAggFunction();
  (ResultSmallA(tradeId, tradeVersion, min.mergeArrays(values,
r2.values)), i1._2)
})

val reduced = reduced2.map { case (t, (r, _)) => (r.tradeId,
r.tradeVersion, t.agreement, r.values) }


//3. Cogroup

val cogrouped1 = dsresults.groupBy(r => (r.tradeId,
r.tradeVersion)).cogroup(dstrades.groupBy(t => (t.tradeId,
t.tradeVersion))) {
  case (key, data1, data2) =>
if (data2.isEmpty || data1.isEmpty) Iterator()
else {
  val t = data2.next()
  val min = new MinVectorAggFunction()
  Iterator((t.tradeId, t.tradeVersion, t.agreement,
data1.map(_.values).reduce(min.mergeArrays)))
}
}

// MinVectorAggFunction just merge two array of Double

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Job History Logs for spark jobs submitted on YARN

2016-01-12 Thread Arkadiusz Bicz
Hi,

You can checkout http://spark.apache.org/docs/latest/monitoring.html,
you can monitor hdfs, memory usage per job and executor and driver. I
have connected it to Graphite for storage and Grafana for
visualization. I have also connected to collectd which provides me all
server nodes metrics like disc, memory and cpu utilization.

On Tue, Jan 12, 2016 at 10:50 AM, laxmanvemula  wrote:
> I observe that YARN jobs history logs are created in /user/history/done
> (*.jhist files) for all the mapreduce jobs like hive, pig etc. But for spark
> jobs submitted in yarn-cluster mode, the logs are not being created.
>
> I would like to see resource utilization by spark jobs. Is there any other
> place where I can find the resource utilization by spark jobs (CPU, Memory
> etc). Or is there any configuration to be set so that the job history logs
> are created just like other mapreduce jobs.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Job-History-Logs-for-spark-jobs-submitted-on-YARN-tp25946.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.6 udf/udaf alternatives in dataset?

2016-01-11 Thread Arkadiusz Bicz
Hi,

There are some documentation in

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html

and also you can check out tests of DatasetSuite in spark sources.


BR,

Arkadiusz Bicz


On Mon, Jan 11, 2016 at 5:37 AM, Muthu Jayakumar <bablo...@gmail.com> wrote:
> Hello there,
>
> While looking at the features of Dataset, it seem to provide an alternative
> way towards udf and udaf. Any documentation or sample code snippet to write
> this would be helpful in rewriting existing UDFs into Dataset mapping step.
> Also, while extracting a value into Dataset using as[U] method, how could I
> specify a custom encoder/translation to case class (where I don't have the
> same column-name mapping or same data-type mapping)?
>
> Please advice,
> Muthu

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How HiveContext can read subdirectories

2016-01-07 Thread Arkadiusz Bicz
Hi,

Can Spark using HiveContext External Tables read sub-directories?

Example:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql._

import sqlContext.implicits._

//prepare data and create subdirectories with parquet
val df = Seq("id1" -> 1, "id2" -> 4, "id3"-> 5).toDF("id", "value")
df.write.parquet("/tmp/df/1")
val df2 = Seq("id6"-> 6, "id7"-> 7, "id8"-> 8).toDF("id", "value")
df2.write.parquet("/tmp/df/2")
val dfall = sqlContext.read.load("/tmp/df/*/")
assert(dfall.count == 6)

//convert to HiveContext
val hc = new HiveContext(sqlContext.sparkContext)

hc.sql("SET hive.mapred.supports.subdirectories=true")
hc.sql("SET mapreduce.input.fileinputformat.input.dir.recursive=true")

hc.sql("create external table testsubdirectories (id string, value
string) STORED AS PARQUET location '/tmp/df'")

val hcall = hc.sql("select * from testsubdirectories")

assert(hcall.count() == 6)  //shoud return 6 but it is 0 as not read
from subdirectories

Thanks,

Arkadiusz Bicz

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark DataFrame limit question

2016-01-06 Thread Arkadiusz Bicz
Hi,

Does limit working for DataFrames, Spark SQL and Hive Context without
full scan for parquet in Spark 1.6 ?

I just used it to create small parquet file from large number of
parquet files and found out that it doing full scan of all data
instead just read limited number:

All of bellow commands doing full scan

val results = sqlContext.read.load("/largenumberofparquetfiles/")

results.limit(1).write.parquet("/tmp/smallresults1")

result.registerTempTable("resultTemp")

val select = sqlContext.sql("select * from resultTemp limit 1")

select.write.parquet("/tmp/smallresults2")

The same when I create external table in hive context as results table

hiveContext.sql("select * from results limit
1").write.parquet("/tmp/results/one3")


Thanks,

Arkadiusz Bicz

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Monitoring Spark HDFS Reads and Writes

2015-12-31 Thread Arkadiusz Bicz
Hello,

Spark collect HDFS read/write metrics per application/job see details
http://spark.apache.org/docs/latest/monitoring.html.

I have connected spark metrics to Graphite and then doing nice graphs
display on Graphana.

BR,

Arek

On Thu, Dec 31, 2015 at 2:00 PM, Steve Loughran  wrote:
>
>> On 30 Dec 2015, at 13:19, alvarobrandon  wrote:
>>
>> Hello:
>>
>> Is there anyway of monitoring the number of Bytes or blocks read and written
>> by an Spark application?. I'm running Spark with YARN and I want to measure
>> how I/O intensive a set of applications are. Closest thing I have seen is
>> the HDFS DataNode Logs in YARN but they don't seem to have Spark
>> applications specific reads and writes.
>>
>> 2015-12-21 18:29:15,347 INFO
>> org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src:
>> /127.0.0.1:53805, dest: /127.0.0.1:50010, bytes: 72159, op: HDFS_WRITE,
>> cliID: DFSClient_NONMAPREDUCE_-1850086307_1, offset: 0, srvID:
>> a9edc8ad-fb09-4621-b469-76de587560c0, blockid:
>> BP-189543387-138.100.13.81-1450715936956:blk_1073741837_1013, duration:
>> 2619119
>> hadoop-alvarobrandon-datanode-usuariop81.fi.upm.es.log:2015-12-21
>> 18:29:15,429 INFO org.apache.hadoop.hdfs.server.d
>>
>> Is there any trace about this kind of operations to be found in any log?
>
>
> 1. the HDFS namenode and datanodes all collect metrics of their use, with 
> org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics being the most 
> interesting on IO.
> 2. FileSystem.Statistics is a static structure collecting data on operations 
> and data for each thread in a client process.
> 3. The HDFS input streams also supports some read statistics (ReadStatistics 
> via getReadReadStatistics)
> 4. the recent versions of HDFS are also adding htrace support, to trace 
> end-to-end performance.
>
> I'd start with FileSystem.Statistics; if that's not being collected across 
> spark jobs, it should be possible
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org