Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-06-02 Thread Timur Shenkao
Did you use RDDs or DataFrames?
What is the Spark version?

On Mon, May 28, 2018 at 10:32 PM, Saulo Sobreiro 
wrote:

> Hi,
> I run a few more tests and found that even with a lot more operations on
> the scala side, python is outperformed...
>
> Dataset Stream duration: ~3 minutes (csv formatted data messages read from
> Kafka)
> Scala process/store time: ~3 minutes (map with split + metrics
> calculations + store raw + strore metrics )
> Python process/store time: ~7 minutes (map with split + store raw )
>
> This is the difference between being usable in production or not. I get
> that python is likely to be slower because of that Python - Java object
> transformations, but I was not expecting such a huge difference.
>
> This results are very interesting as I was comparing to the time that an
> "equivalent" application in storm takes to process the exact same stream
> (~3 minutes as well) for the same results and spark was clearly losing the
> race.
>
> Thank you all for your feedback :)
>
> Regards,
> Saulo
>
> On 21/05/2018 14:09:40, Russell Spitzer  wrote:
> The answer is most likely that when you use Cross Java - Python code you
> incur a penalty for every objects that you transform from a Java object
> into a Python object (and then back again to a Python object) when data is
> being passed in and out of your functions. A way around this would probably
> be to have used the Dataframe API if possible, which would have compiled
> the interactions in Java and skipped python-java serialization. Using Scala
> from the start thought is a great idea. I would also probably remove the
> cache from your stream since that probably is only hurting (adding an
> additional serialization which is only used once.)
>
> On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman 
> wrote:
>
>> The main language they developed spark with is scala, so all the new
>> features go first to scala, java and finally python. I'm not surprised by
>> the results, we've seen it on Stratio since the first versions of spark. At
>> the beginning of development, some of our engineers make the prototype with
>> python, but when it comes down to it, if it goes into production, it has to
>> be rewritten in scala or java, usually scala.
>>
>>
>>
>> El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<
>> saulo.sobre...@outlook.pt>) escribió:
>>
>>> Hi Javier,
>>>
>>> Thank you a lot for the feedback.
>>> Indeed the CPU is a huge limitation. I got a lot of trouble trying to
>>> run this use case in yarn-client mode. I managed to run this in standalone
>>> (local master) mode only.
>>>
>>> I do not have the hardware available to run this setup in a cluster yet,
>>> so I decided to dig a little bit more in the implementation to see what
>>> could I improve. I just finished evaluating some results.
>>> If you find something wrong or odd please let me know.
>>>
>>> Following your suggestion to use "saveToCassandra" directly I decided to
>>> try Scala. Everything was implemented in the most similar way possible and
>>> I got surprised by the results. The scala implementation is much faster.
>>>
>>> My current implementation is slightly different from the Python code
>>> shared some emails ago but to compare the languages influence in the most
>>> comparable way I used the following snippets:
>>>
>>> # Scala implementation --
>>>
>>> val kstream = KafkaUtils.createDirectStream[String, String](
>>>  ssc,
>>>  LocationStrategies.PreferConsistent,
>>>  ConsumerStrategies.Subscribe[String, String](topic,
>>> kafkaParams))
>>> kstream
>>>.map( x => parse(x.value) )
>>>.saveToCassandra("hdpkns", "batch_measurement")
>>>
>>> # Python implementation 
>>> # Adapted from the previously shared code. However instead of
>>> calculating the metrics, it is just parsing the messages.
>>>
>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>> {"metadata.broker.list": brokers})
>>
>
>> kafkaStream \
>> .transform(parse) \
>> .foreachRDD(casssave)
>>
>>
>>> For the same streaming input the scala app took an average of ~1.5
>>> seconds to handle each event. For the python implementation, the app took
>>> an average of ~80 seconds to handle each event (and after a lot of pickle
>>> concurrency access issues).
>>>
>>> Note that I considered the time as the difference between the event
>>> generation (before being published to Kafka) and the moment just before the
>>> saveToCassandra.
>>>
>>> The problem in the python implementation seems to be due to the delay
>>> introduced by the foreachRDD(casssave) call, which only runs 
>>> rdd.saveToCassandra(
>>> "test_hdpkns", "measurement" ).
>>>
>>>
>>> Honestly I was not expecting such a difference between these 2 codes...
>>> Can you understand why is this happening ?
>>>
>>>
>>>
>>> Again, Thank you very much for your help,
>>>
>>> Best Regards
>>>
>>>
>>> Sharing my current S

Re: py4j.protocol.Py4JJavaError: An error occurred while calling o794.parquet

2018-01-10 Thread Timur Shenkao
Caused by: org.apache.spark.SparkException: Task not serializable

That's the answer :)

What are you trying to save? Is it empty or None / null?


On Wed, Jan 10, 2018 at 4:58 PM, Liana Napalkova <
liana.napalk...@eurecat.org> wrote:

> Hello,
>
>
> Has anybody faced the following problem in PySpark? (Python 2.7.12):
>
> df.show() # works fine and shows the first 5 rows of DataFrame
>
> df.write.parquet(outputPath + '/data.parquet', mode="overwrite")  #
> throws the error
>
> The last line throws the following error:
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o794.parquet.
> : org.apache.spark.SparkException: Job aborted.
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
>
> Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
>   at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>   at 
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123)
>   at 
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
>
> Caused by: org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>
> Caused by: java.lang.IllegalArgumentException
> at java.nio.Buffer.position(Buffer.java:244)
> at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:153)
> at java.nio.ByteBuffer.get(ByteBuffer.java:715)
>
> Caused by: java.nio.BufferUnderflowException
>
>   at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
>   at java.nio.ByteBuffer.get(ByteBuffer.java:715)
>   at 
> org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
>   at 
> org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
>   at 
> org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
>   at sun.reflect.GeneratedMethodAccessor48.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>
> Thanks.
>
> L.
>
> --
> DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè
> no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber
> immediatament a la següent adreça: le...@eurecat.org Si el destinatari
> d'aquest missatge no consent la utilització del correu electrònic via
> Internet i la gravació de missatges, li preguem que ens ho comuniqui
> immediatament.
>
> DISCLAIMER: Este mensaje puede contener información confidencial. Si usted
> no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo
> inmediatamente a la siguiente dirección: le...@eurecat.org Si el
> destinatario de este mensaje no consintiera la utilización del correo
> electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga
> en nuestro conocimiento de forma inmediata.
>
> DISCLAIMER: Privileged/Confidential Information may be contained in this
> message. If you are not the addressee indicated in this message you should
> destroy this message, and notify us immediately to the following address:
> le...@eurecat.org. If the addressee of this message does not consent to
> the use of Internet e-mail and message recording, please notify us
> immediately.
> --
>
>
>


Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Timur Shenkao
Spark Dataset / Dataframe has foreachPartition() as well. Its
implementation is much more efficient than RDD's.
There is ton of code snippets, say
https://github.com/hdinsight/spark-streaming-data-persistence-examples/blob/master/src/main/scala/com/microsoft/spark/streaming/examples/common/DataFrameExtensions.scala

On Mon, Dec 18, 2017 at 3:07 PM, Liana Napalkova <
liana.napalk...@eurecat.org> wrote:

> I need to firstly read from Kafka queue into a DataFrame. Then I should
> perform some transformations with the data. Finally, for each row in the
> DataFrame I should conditionally apply KafkaProducer in order to send some
> data to Kafka.
>
> So, I am both consuming and producing the data from/to Kafka.
>
>
>
> --
> *From:* Silvio Fiorito 
> *Sent:* 18 December 2017 16:00:39
> *To:* Liana Napalkova; user@spark.apache.org
> *Subject:* Re: How to properly execute `foreachPartition` in Spark 2.2
>
>
> Why don’t you just use the Kafka sink for Spark 2.2?
>
>
>
> https://spark.apache.org/docs/2.2.0/structured-streaming-
> kafka-integration.html#creating-a-kafka-sink-for-streaming-queries
>
>
>
>
>
>
>
> *From: *Liana Napalkova 
> *Date: *Monday, December 18, 2017 at 9:45 AM
> *To: *"user@spark.apache.org" 
> *Subject: *How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> Hi,
>
>
>
> I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I
> explain the problem is details. I appreciate any help.
>
>
>
> In Spark 1.6 I was doing something similar to this:
>
>
>
> DstreamFromKafka.foreachRDD(session => {
> session.foreachPartition { partitionOfRecords =>
>   println("Setting the producer.")
>   val producer = Utils.createProducer(mySet.
> value("metadataBrokerList"),
>
> mySet.value("batchSize"),
>
> mySet.value("lingerMS"))
>   partitionOfRecords.foreach(s => {
>
>  //...
>
>
>
> However, I cannot find the proper way to do the similar thing in Spark
> 2.2. I tried to write my own class by extending `ForeachWriter`, but I get
> Task Serialization error when passing `KafkaProducer`.
>
> *class *MyTestClass(
> // *val inputparams*: String)
>   *extends *Serializable
> {
>
>   *val **spark *= SparkSession
> .*builder*()
> .appName("TEST")
> //.config("spark.sql.warehouse.dir", kafkaData)
> .enableHiveSupport()
> .getOrCreate()
>
> *import **spark*.implicits._
>
> *val *df: Dataset[String] = *spark*.readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test")
>  .option("startingOffsets", "latest")
>  .option("failOnDataLoss", "true")
>  .load()
>  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
> String)] // Kafka sends bytes
>  .map(_._2)
>
> *val *producer = // create KafkaProducer
>
> *val *writer = *new *MyForeachWriter(producer: KafkaProducer[String,String])
>
> *val *query = df
>   .writeStream
>   .foreach(writer)
>   .start
>
> query.awaitTermination()
>
> *spark*.stop()
>
>
> *class *MyForeachWriter *extends *ForeachWriter[String] *with *Serializable {
>
>   *var **producer*: KafkaProducer[String,String] = _
>
>   *def this*(producer: KafkaProducer[String,String])
>   {
> *this*()
> *this*.*producer *= producer
>   }
>
>   *override def *process(row: String): Unit =
>   {
> // ...
>   }
>
>   *override def *close(errorOrNull: Throwable): Unit = {}
>
>   *override def *open(partitionId: Long, version: Long): Boolean = {
>
> *true  *}
>
> }
>
>
>
>
>
> *Liana Napalkova, PhD*
>
> *Big Data Analytics Unit*
> * -- *
>
>
>
>
>
> *T  +34 **93 238 14 00 (ext. 1248)*
> *M +34 **633 426 677*
>
> *liana.napalk...@eurecat.org *
> --
>
> Carrer Camí Antic de València 54
> -56,
> Edifici A - 08005 - Barcelona
> www.eurecat.org
>
> Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat
>
>
> --
> DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè
> no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber
> immediatament a la següent adreça: le...@eurecat.org Si el destinatari
> d'aquest missatge no consent la utilització del correu electrònic via
> Internet i la gravació de missatges, li preguem que ens ho comuniqui
> immediatament.
>
> DISCLAIMER: Este mensaje puede contener información confidencial. Si usted
> no es el destinatario del mensaje, por favor bórrelo y notifíquenoslo
> inmediatamente a la siguiente dirección: le...@eurecat.org Si el
> destinatario de este mensaje no consintiera la utilización del correo
> electrónico vía Internet y la grabación de los mensajes, rogamos lo ponga
> en nuestro conocimiento de forma inmediata.
>
> DISCLAIMER: Privileged/Confidential I

Re: SANSA 0.3 (Scalable Semantic Analytics Stack) Released

2017-12-18 Thread Timur Shenkao
Hello

Thank you for very interesting job!
The question are :
1) where do you store final results or intermediate results? Parquet,
Janusgraph, Cassandra ?
2) Is there integration with Spark GraphFrames?

Sincerely yours, Timur

On Mon, Dec 18, 2017 at 9:21 AM, Hajira Jabeen 
wrote:

> Dear all,
>
> The Smart Data Analytics group [1] is happy to announce SANSA 0.3 - the
> third release of the Scalable Semantic Analytics Stack. SANSA employs
> distributed computing via Apache Spark and Flink in order to allow scalable
> machine learning, inference and querying capabilities for large knowledge
> graphs.
>
> Website: http://sansa-stack.net
>
> GitHub: https://github.com/SANSA-Stack
>
> Download: http://sansa-stack.net/downloads-usage/
>
> ChangeLog: https://github.com/SANSA-Stack/SANSA-Stack/releases
>
> You can find the FAQ and usage examples at http://sansa-stack.net/faq/.
>
> The following features are currently supported by SANSA:
>
> * Reading and writing RDF files in N-Triples, Turtle, RDF/XML, N-Quad
> format
>
> * Reading OWL files in various standard formats
>
> * Support for multiple data partitioning techniques
>
> * SPARQL querying via Sparqlify (with some known limitations until the
> next Spark 2.3.* release)
>
> * SPARQL querying via conversion to Gremlin path traversals (experimental)
>
> * RDFS, RDFS Simple, OWL-Horst (all in beta status), EL (experimental)
> forward chaining inference
>
> * Automatic inference plan creation (experimental)
>
> * RDF graph clustering with different algorithms
>
> * Rule mining from RDF graphs based AMIE+
>
> * Terminological decision trees (experimental)
>
> * Anomaly detection (beta)
>
> * Distributed knowledge graph embedding approaches: TransE (beta),
> DistMult (beta), several further algorithms planned
>
> Deployment and getting started:
>
> * There are template projects for SBT and Maven for Apache Spark as well
> as for Apache Flink available [2] to get started.
>
> * The SANSA jar files are in Maven Central i.e. in most IDEs you can just
> search for “sansa” to include the dependencies in Maven projects.
>
> * There is example code for various tasks available [3].
>
> * We provide interactive notebooks for running and testing code [4] via
> Docker.
>
> We want to thank everyone who helped to create this release, in particular
> the projects Big Data Europe [5], HOBBIT [6], SAKE [7], Big Data Ocean [8],
> SLIPO [9], QROWD [10] and BETTER.
>
> View this announcement on Twitter and the SDA blog:
>
>  http://sda.cs.uni-bonn.de/sansa-0-3/
>
>  https://twitter.com/SANSA_Stack/status/941643408300441600
>
> Kind regards,
>
> The SANSA Development Team
>
> (http://sansa-stack.net/community/#Contributors)
>
> [1] http://sda.tech
>
> [2] http://sansa-stack.net/downloads-usage/
>
> [3] https://github.com/SANSA-Stack/SANSA-Examples
>
> [4] https://github.com/SANSA-Stack/SANSA-Notebooks
>
> [5] http://www.big-data-europe.eu
>
> [6] https://project-hobbit.eu
>
> [7] https://www.sake-projekt.de/en/start/
>
> [8] http://www.bigdataocean.eu
>
> [9] http://slipo.eu
>
> [10] http://qrowd-project.eu
>
>
>
> Dr.  Hajira Jabeen
> Senior researcher,
> SDA, Universität Bonn.
>
> http://sda.cs.uni-bonn.de/people/dr-hajira-jabeen/
>


Re: Job spark blocked and runs indefinitely

2017-10-26 Thread Timur Shenkao
HBase has its own Java API and Scala API: you can use what you like.
Btw, which Spark-Hbase connector do you use? Cloudera or Hortonworks?

On Wed, Oct 11, 2017 at 3:01 PM, Amine CHERIFI <
cherifimohamedam...@gmail.com> wrote:

> it seems that the job block whene we call newAPIHadoopRDD to get data from
> Hbase. it may be the issue !!
> is there another api to load date from hbase ?
>
>
>  Sent with Mailtrack
> 
> <#m_758372304976785313_>
>
> 2017-10-11 14:45 GMT+02:00 Sebastian Piu :
>
>> We do have this issue randomly too, so interested in hearing if someone
>> was able to get to the bottom of it
>>
>> On Wed, 11 Oct 2017, 13:40 amine_901, 
>> wrote:
>>
>>> We encounter a problem on a Spark job 1.6(on yarn) that never ends, whene
>>> several jobs launched simultaneously.
>>> We found that by launching the job spark in yarn-client mode we do not
>>> have
>>> this problem, unlike launching it in yarn-cluster mode.
>>> it could be a trail to find the cause.
>>>
>>> we changed the code to add a sparkContext.stop ()
>>> Indeed, the SparkContext was created (val sparkContext =
>>> createSparkContext)
>>> but not stopped. this solution has allowed us to decrease the number of
>>> jobs
>>> that remains blocked but nevertheless we still have some jobs blocked.
>>>
>>> by analyzing the logs we have found this log that repeats without
>>> stopping:
>>> /17/09/29 11:04:37 DEBUG SparkEventPublisher: Enqueue
>>> SparkListenerExecutorMetricsUpdate(1,WrappedArray())
>>> 17/09/29 11:04:41 DEBUG ApplicationMaster: Sending progress
>>> 17/09/29 11:04:41 DEBUG ApplicationMaster: Number of pending allocations
>>> is
>>> 0. Sleeping for 5000. /
>>>
>>> Does someone have an idea about this issue ?
>>> Thank you in advance
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>
>
> --
> CHERIFI Mohamed Amine
> Développeur Big data/Data scientist
> 07 81 65 17 03
>


Re: strange behavior of spark 2.1.0

2017-04-02 Thread Timur Shenkao
Hello,
It's difficult to tell without details.
I believe one of the executors dies because of OOM or some Runtime
Exception (some unforeseen dirty data row).
Less probable is GC stop-the-world pause when incoming message rate
increases drastically.


On Saturday, April 1, 2017, Jiang Jacky  wrote:

> Hello, Guys
> I am running the spark streaming in 2.1.0, the scala version is tried on
> 2.11.7 and 2.11.4. And it is consuming from JMS. Recently, I have get the
> following error
> *"ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0:
> Stopped by driver"*
>
> *This error can be occurred randomly, it might be couple hours or couple
> days. besides this error, everything is perfect.*
> When the error happens, my job is stopped completely. There is no any
> other error can be found.
> I am running on top of yarn, and tried to look up the error through yarn
> logs, container, no any further information appears there. The job is just
> stopped from driver gracefully. BTW I have customized receiver, I either do
> not think it is happened from receiver, there is no any error exception
> from receiver, and I can also track the stop command is sent from "onStop"
> function in receiver.
>
> FYI, the driver is not consuming any large memory, there is no any RDD
> "collect" command in the driver. I have also checked container log for each
> executor, and cannot find any further error.
>
>
>
>
> The following is my conf for the spark context
> val conf = new SparkConf().setAppName(jobName).setMaster(master)
>   .set("spark.hadoop.validateOutputSpecs", "false")
>   .set("spark.driver.allowMultipleContexts", "true")
>   .set("spark.streaming.receiver.maxRate", "500")
>   .set("spark.streaming.backpressure.enabled", "true")
>   .set("spark.streaming.stopGracefullyOnShutdown", "true")
>   .set("spark.eventLog.enabled", "true");
>
> If you have any idea or suggestion, please let me know. Appreciate on the
> solution.
>
> Thank you so much
>
>


Re: My spark job runs faster in spark 1.6 and much slower in spark 2.0

2017-02-14 Thread Timur Shenkao
Hello,
I'm not sure that's your reason but check this discussion:

http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-ML-Pipeline-performance-regression-between-1-6-and-2-x-td20803.html

On Tue, Feb 14, 2017 at 9:25 PM, anatva  wrote:

> Hi,
> I am reading an ORC file, and perform some joins, aggregations and finally
> generate a dense vector to perform analytics.
>
> The code runs in 45 minutes on spark 1.6 on a 4 node cluster. When the same
> code is migrated to run on spark 2.0 on the same cluster, it takes around
> 4-5 hours. It is surprising and frustrating.
>
> Can anyone take a look and help me what should I change in order to get
> atleast same performance in spark 2.0.
>
> spark-shell --master yarn-client --driver-memory 5G --executor-memory 20G \
> --num-executors 15 --executor-cores 5 --queue grid_analytics --conf
> spark.yarn.executor.memoryOverhead=5120 --conf
> spark.executor.heartbeatInterval=1200
>
> import sqlContext.implicits._
> import org.apache.spark.storage.StorageLevel
> import org.apache.spark.sql.functions.lit
> import org.apache.hadoop._
> import org.apache.spark.sql.functions.udf
> import org.apache.spark.mllib.linalg.{Vector, Vectors}
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.feature.StandardScaler
> import org.apache.spark.ml.feature.Normalizer
>
> here are the steps:
> 1. read orc file
> 2. filter some of the records
> 3. persist resulting data frame
> 4. get distinct accounts from the df and get a sample of 50k accts from the
> distinct list
> 5. join the above data frame with distinct 50k accounts to pull records for
> only those 50k accts
> 6. perform a group by to get the avg, mean, sum, count of readings for the
> given 50k accts
> 7. join the df obtained by GROUPBY with original DF
> 8. convert the resultant DF to an RDD, do a groupbyKey(), and generate a
> DENSE VECTOR
> 9. convert RDD back to DF and store it in a parquet file
>
> The above steps worked fine in spark 1.6 but i m not sure why they run
> painfully long in spark 2.0.
>
> I am using spark 1.6 & spark 2.0 on HDP 2.5.3
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/My-spark-job-runs-faster-in-
> spark-1-6-and-much-slower-in-spark-2-0-tp28390.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: is dataframe thread safe?

2017-02-12 Thread Timur Shenkao
Hello,

I suspect that your need isn't parallel execution but parallel data access.
In that case, use Alluxio or Ignite.

Or, more exotic, one Spark job writes to Kafka and the other ones read from
Kafka.

Sincerely yours, Timur

On Sun, Feb 12, 2017 at 2:30 PM, Mendelson, Assaf 
wrote:

> There is no threads within maps here. The idea is to have two jobs on two
> different threads which use the same dataframe (which is cached btw).
>
> This does not override spark’s parallel execution of transformation or any
> such. The documentation (job scheduling) actually hints at this option but
> doesn’t say specifically if it is supported when the same dataframe is used.
>
> As for configuring the scheduler, this would not work. First it would mean
> that the same cached dataframe cannot be used, I would have to add some
> additional configuration such as alluxio (and it would still have to
> serialize/deserialize) as opposed to using the cached data. Furthermore,
> multi-tenancy between applications is limited to either dividing the
> cluster between the applications or using dynamic allocation (which has its
> own overheads).
>
>
>
> Therefore Sean’s answer is what I was looking for (and hoping for…)
>
> Assaf
>
>
>
> *From:* Jörn Franke [mailto:jornfra...@gmail.com]
> *Sent:* Sunday, February 12, 2017 2:46 PM
> *To:* Sean Owen
> *Cc:* Mendelson, Assaf; user
>
> *Subject:* Re: is dataframe thread safe?
>
>
>
> I did not doubt that the submission of several jobs of one application
> makes sense. However, he want to create threads within maps etc., which
> looks like calling for issues (not only for running the application itself,
> but also for operating it in production within a shared cluster). I would
> rely for parallel execution of the transformations on the out-of-the-box
> functionality within Spark.
>
>
>
> For me he looks for a solution that can be achieved by a simple
> configuration of the scheduler in Spark, yarn or mesos. In this way the
> application would be more maintainable in production.
>
>
> On 12 Feb 2017, at 11:45, Sean Owen  wrote:
>
> No this use case is perfectly sensible. Yes it is thread safe.
>
> On Sun, Feb 12, 2017, 10:30 Jörn Franke  wrote:
>
> I think you should have a look at the spark documentation. It has
> something called scheduler who does exactly this. In more sophisticated
> environments yarn or mesos do this for you.
>
>
>
> Using threads for transformations does not make sense.
>
>
> On 12 Feb 2017, at 09:50, Mendelson, Assaf 
> wrote:
>
> I know spark takes care of executing everything in a distributed manner,
> however, spark also supports having multiple threads on the same spark
> session/context and knows (Through fair scheduler) to distribute the tasks
> from them in a round robin.
>
>
>
> The question is, can those two actions (with a different set of
> transformations) be applied to the SAME dataframe.
>
>
>
> Let’s say I want to do something like:
>
>
>
>
>
>
>
> Val df = ???
>
> df.cache()
>
> df.count()
>
>
>
> def f1(df: DataFrame): Unit = {
>
>   val df1 = df.groupby(something).agg(some aggs)
>
>   df1.write.parquet(“some path”)
>
> }
>
>
>
> def f2(df: DataFrame): Unit = {
>
>   val df2 = df.groupby(something else).agg(some different aggs)
>
>   df2.write.parquet(“some path 2”)
>
> }
>
>
>
> f1(df)
>
> f2(df)
>
>
>
> df.unpersist()
>
>
>
> if the aggregations do not use the full cluster (e.g. because of data
> skewness, because there aren’t enough partitions or any other reason) then
> this would leave the cluster under utilized.
>
>
>
> However, if I would call f1 and f2 on different threads, then df2 can use
> free resources f1 has not consumed and the overall utilization would
> improve.
>
>
>
> Of course, I can do this only if the operations on the dataframe are
> thread safe. For example, if I would do a cache in f1 and an unpersist in
> f2 I would get an inconsistent result. So my question is, what, if any are
> the legal operations to use on a dataframe so I could do the above.
>
>
>
> Thanks,
>
> Assaf.
>
>
>
> *From:* Jörn Franke [mailto:jornfra...@gmail.com ]
> *Sent:* Sunday, February 12, 2017 10:39 AM
> *To:* Mendelson, Assaf
> *Cc:* user
> *Subject:* Re: is dataframe thread safe?
>
>
>
> I am not sure what you are trying to achieve here. Spark is taking care of
> executing the transformations in a distributed fashion. This means you must
> not use threads - it does not make sense. Hence, you do not find
> documentation about it.
>
>
> On 12 Feb 2017, at 09:06, Mendelson, Assaf 
> wrote:
>
> Hi,
>
> I was wondering if dataframe is considered thread safe. I know the spark
> session and spark context are thread safe (and actually have tools to
> manage jobs from different threads) but the question is, can I use the same
> dataframe in both threads.
>
> The idea would be to create a dataframe in the main thread and then in two
> sub threads do different transformations and actions on it.
>
> I understand that some things might not be

Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-11 Thread Timur Shenkao
Hello,

1) Are you sure that your data is "clean"?  No unexpected missing values?
No strings in unusual encoding? No additional or missing columns ?
2) How long does your job run? What about garbage collector parameters?
Have you checked what happens with jconsole / jvisualvm ?

Sincerely yours, Timur

On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan 
wrote:

> Hi Nick,
> Because we use *RandomSignProjectionLSH*, there is only one parameter for
> LSH is the number of hashes. I try with small number of hashes (2) but the
> error is still happens. And it happens when I call similarity join. After
> transformation, the size of  dataset is about 4G.
>
> 2017-02-11 3:07 GMT+07:00 Nick Pentreath :
>
>> What other params are you using for the lsh transformer?
>>
>> Are the issues occurring during transform or during the similarity join?
>>
>>
>> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan 
>> wrote:
>>
>>> hi Das,
>>> In general, I will apply them to larger datasets, so I want to use LSH,
>>> which is more scaleable than the approaches as you suggested. Have you
>>> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
>>> parameters/configuration to make it work ?
>>> Thanks.
>>>
>>> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>>>
>>> If it is 7m rows and 700k features (or say 1m features) brute force row
>>> similarity will run fine as well...check out spark-4823...you can compare
>>> quality with approximate variant...
>>> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan"  wrote:
>>>
>>> Hi everyone,
>>> Since spark 2.1.0 introduces LSH (http://spark.apache.org/docs/
>>> latest/ml-features.html#locality-sensitive-hashing), we want to use LSH
>>> to find approximately nearest neighbors. Basically, We have dataset with
>>> about 7M rows. we want to use cosine distance to meassure the similarity
>>> between items, so we use *RandomSignProjectionLSH* (
>>> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db)
>>> instead of *BucketedRandomProjectionLSH*. I try to tune some
>>> configurations such as serialization, memory fraction, executor memory
>>> (~6G), number of executors ( ~20), memory overhead ..., but nothing works.
>>> I often get error "java.lang.OutOfMemoryError: Java heap space" while
>>> running. I know that this implementation is done by engineer at Uber but I
>>> don't know right configurations,.. to run the algorithm at scale. Do they
>>> need very big memory to run it?
>>>
>>> Any help would be appreciated.
>>> Thanks
>>>
>>>
>>>
>


Re: [Spark SQL] Task failed while writing rows

2016-12-25 Thread Timur Shenkao
Hi,

I've just read your message.  Have you resolved the problem ?
If not, what is the contents of /etc/hosts ?

On Mon, Dec 19, 2016 at 10:09 PM, Michael Stratton <
michael.strat...@komodohealth.com> wrote:

> I don't think the issue is an empty partition, but it may not hurt to try
> a repartition prior to writing just to rule it out due to the premature EOF
> exception.
>
> On Mon, Dec 19, 2016 at 1:53 PM, Joseph Naegele <
> jnaeg...@grierforensics.com> wrote:
>
>> Thanks Michael, hdfs dfsadmin -report tells me:
>>
>>
>>
>> Configured Capacity: 7999424823296 (7.28 TB)
>>
>> Present Capacity: 7997657774971 (7.27 TB)
>>
>> DFS Remaining: 7959091768187 (7.24 TB)
>>
>> DFS Used: 38566006784 (35.92 GB)
>>
>> DFS Used%: 0.48%
>>
>> Under replicated blocks: 0
>>
>> Blocks with corrupt replicas: 0
>>
>> Missing blocks: 0
>>
>> Missing blocks (with replication factor 1): 0
>>
>>
>>
>> -
>>
>> Live datanodes (1):
>>
>>
>>
>> Name: 127.0.0.1:50010 (localhost)
>>
>> Hostname: XXX.XXX.XXX
>>
>> Decommission Status : Normal
>>
>> Configured Capacity: 7999424823296 (7.28 TB)
>>
>> DFS Used: 38566006784 (35.92 GB)
>>
>> Non DFS Used: 1767048325 (1.65 GB)
>>
>> DFS Remaining: 7959091768187 (7.24 TB)
>>
>> DFS Used%: 0.48%
>>
>> DFS Remaining%: 99.50%
>>
>> Configured Cache Capacity: 0 (0 B)
>>
>> Cache Used: 0 (0 B)
>>
>> Cache Remaining: 0 (0 B)
>>
>> Cache Used%: 100.00%
>>
>> Cache Remaining%: 0.00%
>>
>> Xceivers: 17
>>
>> Last contact: Mon Dec 19 13:00:06 EST 2016
>>
>>
>>
>> The Hadoop exception occurs because it times out after 60 seconds in a
>> “select” call on a java.nio.channels.SocketChannel, while waiting to
>> read from the socket. This implies the client writer isn’t writing on the
>> socket as expected, but shouldn’t this all be handled by the Hadoop library
>> within Spark?
>>
>>
>>
>> It looks like a few similar, but rare, cases have been reported before,
>> e.g. https://issues.apache.org/jira/browse/HDFS-770 which is *very* old.
>>
>>
>>
>> If you’re pretty sure Spark couldn’t be responsible for issues at this
>> level I’ll stick to the Hadoop mailing list.
>>
>>
>>
>> Thanks
>>
>> ---
>>
>> Joe Naegele
>>
>> Grier Forensics
>>
>>
>>
>> *From:* Michael Stratton [mailto:michael.strat...@komodohealth.com]
>> *Sent:* Monday, December 19, 2016 10:00 AM
>> *To:* Joseph Naegele 
>> *Cc:* user 
>> *Subject:* Re: [Spark SQL] Task failed while writing rows
>>
>>
>>
>> It seems like an issue w/ Hadoop. What do you get when you run hdfs
>> dfsadmin -report?
>>
>>
>>
>> Anecdotally(And w/o specifics as it has been a while), I've generally
>> used Parquet instead of ORC as I've gotten a bunch of random problems
>> reading and writing ORC w/ Spark... but given ORC performs a lot better w/
>> Hive it can be a pain.
>>
>>
>>
>> On Sun, Dec 18, 2016 at 5:49 PM, Joseph Naegele <
>> jnaeg...@grierforensics.com> wrote:
>>
>> Hi all,
>>
>> I'm having trouble with a relatively simple Spark SQL job. I'm using
>> Spark 1.6.3. I have a dataset of around 500M rows (average 128 bytes per
>> record). It's current compressed size is around 13 GB, but my problem
>> started when it was much smaller, maybe 5 GB. This dataset is generated by
>> performing a query on an existing ORC dataset in HDFS, selecting a subset
>> of the existing data (i.e. removing duplicates). When I write this dataset
>> to HDFS using ORC I get the following exceptions in the driver:
>>
>> org.apache.spark.SparkException: Task failed while writing rows
>> Caused by: java.lang.RuntimeException: Failed to commit task
>> Suppressed: java.lang.IllegalArgumentException: Column has wrong number
>> of index entries found: 0 expected: 32
>>
>> Caused by: java.io.IOException: All datanodes 127.0.0.1:50010 are bad.
>> Aborting...
>>
>> This happens multiple times. The executors tell me the following a few
>> times before the same exceptions as above:
>>
>>
>>
>> 2016-12-09 02:38:12.193 INFO DefaultWriterContainer: Using output
>> committer class org.apache.hadoop.mapreduce.li
>> b.output.FileOutputCommitter
>>
>> 2016-12-09 02:41:04.679 WARN DFSClient: DFSOutputStream
>> ResponseProcessor exception  for block BP-1695049761-192.168.2.211-14
>> 79228275669:blk_1073862425_121642
>>
>> java.io.EOFException: Premature EOF: no length prefix available
>>
>> at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(
>> PBHelper.java:2203)
>>
>> at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.
>> readFields(PipelineAck.java:176)
>>
>> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$Response
>> Processor.run(DFSOutputStream.java:867)
>>
>>
>> My HDFS datanode says:
>>
>> 2016-12-09 02:39:24,783 INFO 
>> org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace:
>> src: /127.0.0.1:57836, dest: /127.0.0.1:50010, bytes: 14808395, op:
>> HDFS_WRITE, cliID: DFSClient_attempt_201612090102
>> __m_25_0_956624542_193, offset: 0, srvID:
>> 1003b822-200c-4b93-9f88-f474c0b6ce4a, blockid:
>

Re: Spark Streaming with Kafka

2016-12-11 Thread Timur Shenkao
Hi,
Usual general questions are:
-- what is your Spark version?
-- what is your Kafka version?
-- do you use "standard" Kafka consumer or try to implement something
custom (your own multi-threaded consumer)?

The freshest docs
https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10 !!!)

> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>

On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi <
anton.okolnyc...@gmail.com> wrote:

> Hi,
>
> I am experimenting with Spark Streaming and Kafka. I will appreciate if
> someone can say whether the following assumption is correct.
>
> If I have multiple computations (each with its own output) on one stream
> (created as KafkaUtils.createDirectStream), then there is a chance to
> have ConcurrentModificationException: KafkaConsumer is not safe for
> multi-threaded access.  To solve this problem, I should create a new stream
> with different "group.id" for each computation.
>
> Am I right?
>
> Best regards,
> Anton
>


Re: Can't read tables written in Spark 2.1 in Spark 2.0 (and earlier)

2016-11-30 Thread Timur Shenkao
Hello,

Yes, I used hiveContext, sqlContext, sparkSession from Java, Scala, Python.
Via spark-shell, spark-submit, IDE (PyCharm, Intellij IDEA).
Everything is perfect because I have Hadoop cluster with configured & tuned
HIVE.

The reason of Michael's error is usually misconfigured or absent HIVE.
Or may be absence of hive-site.xml in $SPARK_HOME/conf/ directory.

On Wed, Nov 30, 2016 at 9:30 PM, Gourav Sengupta 
wrote:

> Hi Timur,
>
> did you use hiveContext or sqlContext or the spark way mentioned in the
> http://spark.apache.org/docs/latest/sql-programming-guide.html?
>
>
> Regards,
> Gourav Sengupta
>
> On Wed, Nov 30, 2016 at 5:35 PM, Yin Huai  wrote:
>
>> Hello Michael,
>>
>> Thank you for reporting this issue. It will be fixed by
>> https://github.com/apache/spark/pull/16080.
>>
>> Thanks,
>>
>> Yin
>>
>> On Tue, Nov 29, 2016 at 11:34 PM, Timur Shenkao 
>> wrote:
>>
>>> Hi!
>>>
>>> Do you have real HIVE installation?
>>> Have you built Spark 2.1 & Spark 2.0 with HIVE support ( -Phive
>>> -Phive-thriftserver ) ?
>>>
>>> It seems that you use "default" Spark's HIVE 1.2.1. Your metadata is
>>> stored in local Derby DB which is visible to concrete Spark installation
>>> but not for all.
>>>
>>> On Wed, Nov 30, 2016 at 4:51 AM, Michael Allman 
>>> wrote:
>>>
>>>> This is not an issue with all tables created in Spark 2.1, though I'm
>>>> not sure why some work and some do not. I have found that a table created
>>>> as such
>>>>
>>>> sql("create table test stored as parquet as select 1")
>>>>
>>>> in Spark 2.1 cannot be read in previous versions of Spark.
>>>>
>>>> Michael
>>>>
>>>>
>>>> > On Nov 29, 2016, at 5:15 PM, Michael Allman 
>>>> wrote:
>>>> >
>>>> > Hello,
>>>> >
>>>> > When I try to read from a Hive table created by Spark 2.1 in Spark
>>>> 2.0 or earlier, I get an error:
>>>> >
>>>> > java.lang.ClassNotFoundException: Failed to load class for data
>>>> source: hive.
>>>> >
>>>> > Is there a way to get previous versions of Spark to read tables
>>>> written with Spark 2.1?
>>>> >
>>>> > Cheers,
>>>> >
>>>> > Michael
>>>>
>>>>
>>>> -
>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Can't read tables written in Spark 2.1 in Spark 2.0 (and earlier)

2016-11-29 Thread Timur Shenkao
Hi!

Do you have real HIVE installation?
Have you built Spark 2.1 & Spark 2.0 with HIVE support ( -Phive
-Phive-thriftserver ) ?

It seems that you use "default" Spark's HIVE 1.2.1. Your metadata is stored
in local Derby DB which is visible to concrete Spark installation but not
for all.

On Wed, Nov 30, 2016 at 4:51 AM, Michael Allman 
wrote:

> This is not an issue with all tables created in Spark 2.1, though I'm not
> sure why some work and some do not. I have found that a table created as
> such
>
> sql("create table test stored as parquet as select 1")
>
> in Spark 2.1 cannot be read in previous versions of Spark.
>
> Michael
>
>
> > On Nov 29, 2016, at 5:15 PM, Michael Allman 
> wrote:
> >
> > Hello,
> >
> > When I try to read from a Hive table created by Spark 2.1 in Spark 2.0
> or earlier, I get an error:
> >
> > java.lang.ClassNotFoundException: Failed to load class for data source:
> hive.
> >
> > Is there a way to get previous versions of Spark to read tables written
> with Spark 2.1?
> >
> > Cheers,
> >
> > Michael
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Possible DR solution

2016-11-12 Thread Timur Shenkao
Hi guys!

1) Though it's quite interesting, I believe that this discussion is not
about Spark :)
2) If you are interested, there is solution by Cloudera
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/cm_bdr_replication_intro.html
(requires that *source cluster* has Cloudera Enterprise license, so it's
not for free).
Correct me but I don't remember specialized replication solution by
Hortonworks (Atlas, Falcon, etc. are not precisely about inter-custer
replication).
Some solutions from Hadoop  Ecosystem try to implement replication of their
own:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62687462 ,
http://highscalability.com/blog/2016/8/1/how-to-setup-a-highly-available-multi-az-cassandra-cluster-o.html
,
3) Read this discussion
https://community.hortonworks.com/questions/29645/hdfs-replication-for-dr.html
4) I prefer bash scripts / Python scripts / Oozie jobs + distcp - it's for
free & I control what's going on precisely. But, in case of huge clusters &
sophisticated logic, this approach become cumbersome.
5) Don't forget about security & encryption: your sensitive data may be
read by third-party agents during replication

On Sat, Nov 12, 2016 at 6:05 PM, Mich Talebzadeh 
wrote:

> Thanks Jorn.
>
> The way WanDisco promotes itself is doing block level replication. as I
> understand you modify core-file.xml and add couple of network server
> locations there. they call this tool Fusion. there are at least 2 fusion
> servers for high availability. each one among other things has a database
> of its own. Once the client interacts with HDFS the fusion server behaves
> like a sniffer  with its own port. As soon as the first HTFS block of
> 256MBout of say a file of 30GB is written, it starts sending that block to
> recipient. the laws of physics, the pipeline size etc applies here. That is
> up to the consumer. it can 10 files at the same time etc. so that is all.
> It is a known technology now labeled as streaming. so in summary it does
> not have to wait for the full file to be written to HDFS before replicating
> blocks.  that is where it scores.
>
> It helps WAN work. Say the primary/active HDFS is in London and the
> replicate is in Singapore. so users in Singapore can see replicated data
> (eventually) when it gets there. It can obviously be used for DR in that
> case it is like Hot standby (borrowing a terminology from Sybase). In
> contrast one can do the same with period loads with homemade tools or tools
> like BDR from Cloudera.
>
> I mentioned that Hive is going to have its metastore on Hbase as well and
> that can be potential problems. The site is here
> 
>
> They are claiming there is no competitors in the market for their
> streaming HA product.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 12 November 2016 at 11:17, Jörn Franke  wrote:
>
>> What is wrong with the good old batch transfer for transferring data from
>> a cluster to another? I assume your use case is only business continuity in
>> case of disasters such as data center loss, which are unlikely to happen
>> (well it does not mean they do not happen) and where you could afford to
>> loose one day (or hour) of data (depends!).
>>
>> Nevertheless, I assume he refers to the Hadoop storage policies:
>> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/
>> ArchivalStorage.html , but this still only works for the same cluster.
>>
>> You could also develop a custom secondary file system, similar to the
>> Ignite Cache filesystem, that sits on top of HDFS and as soon as it
>> receives data it sends them to another cluster and provides it to HDFS. Not
>> knowing Wandisco, I assume what it does. Given the prices (and the fact
>> that clusters tend to grow) you may want to evaluate if buying or making
>> makes sense. In any case, it also requires evaluation of network
>> throughput, because this may become the bottleneck somewhere (either within
>> the cluster or more likely between data centers).
>>
>> As you mentioned, Hbase & Co may require a special consideration for the
>> case that data is in-memory and not yet persisted.
>>
>> On Sat, Nov 12, 2016 at 12:04 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> thanks Vince
>>>
>>> can you provide more details on this pls
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBx

Re: YARN - Pyspark

2016-09-30 Thread Timur Shenkao
It's not weird behavior. Did you run the job in cluster mode?
I suspect your driver died / finished / stopped after 12 hours but your job
continued. It's possible as you didn't output anything to console on driver
node.

Quite long time ago, when I just tried Spark Streaming, I launched PySpark
Streaming jobs in PyCharm & pyspark console and "killed" them via Ctrl+Z
Drivers were gone but YARN containers (where computations on slaves were
performed) remained.
Nevertheless, I believe that final result in "some table" is corrupted

On Fri, Sep 30, 2016 at 9:33 AM, ayan guha  wrote:

> Hi
>
> I just observed a litlte weird behavior:
>
> I ran a pyspark job, very simple one.
>
> conf = SparkConf()
> conf.setAppName("Historical Meter Load")
> conf.set("spark.yarn.queue","root.Applications")
> conf.set("spark.executor.instances","50")
> conf.set("spark.executor.memory","10g")
> conf.set("spark.yarn.executor.memoryOverhead","2048")
> conf.set("spark.sql.shuffle.partitions",1000)
> conf.set("spark.executor.cores","4")
> sc = SparkContext(conf = conf)
> sqlContext = HiveContext(sc)
>
> df = sqlContext.sql("some sql")
>
> c = df.count()
>
> df.filter(df["RNK"] == 1).saveAsTable("some table").mode("overwrite")
>
> sc.stop()
>
> running is on CDH 5.7 cluster, Spark 1.6.0.
>
> Behavior observed: After few hours of running (definitely over 12H, but
> not sure exacly when), Yarn reported job as Completed, finished
> successfully, whereas the job kept running (I can see from Application
> master link) for 22H. Timing of the job is expected. Behavior of YARN is
> not.
>
> Is it a known issue? Is it a pyspark specific issue or same with scala as
> well?
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Lemmatization using StanfordNLP in ML 2.0

2016-09-24 Thread Timur Shenkao
Hello, everybody!

May be it's not a reason of your problem, but I've noticed the line in your
commentaries:
*java version "1.8.0_51"*

It's strongly advised to use Java 1.8.0_66+
I use even Java 1.8.0_101


On Tue, Sep 20, 2016 at 1:09 AM, janardhan shetty 
wrote:

> Yes Sujit I have tried that option as well.
> Also tried sbt assembly but hitting below issue:
>
> http://stackoverflow.com/questions/35197120/java-outofmemory
> error-on-sbt-assembly
>
> Just wondering if there any clean approach to include StanfordCoreNLP
> classes in spark ML ?
>
>
> On Mon, Sep 19, 2016 at 1:41 PM, Sujit Pal  wrote:
>
>> Hi Janardhan,
>>
>> You need the classifier "models" attribute on the second entry for
>> stanford-corenlp to indicate that you want the models JAR, as shown below.
>> Right now you are importing two instances of stanford-corenlp JARs.
>>
>> libraryDependencies ++= {
>>   val sparkVersion = "2.0.0"
>>   Seq(
>> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
>> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models",
>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>   )
>> }
>>
>> -sujit
>>
>>
>> On Sun, Sep 18, 2016 at 5:12 PM, janardhan shetty > > wrote:
>>
>>> Hi Sujit,
>>>
>>> Tried that option but same error:
>>>
>>> java version "1.8.0_51"
>>>
>>>
>>> libraryDependencies ++= {
>>>   val sparkVersion = "2.0.0"
>>>   Seq(
>>> "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>>> "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
>>> "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
>>> "org.apache.spark" %% "spark-mllib" % sparkVersion % "provided",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "com.google.protobuf" % "protobuf-java" % "2.6.1",
>>> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
>>> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>>>   )
>>> }
>>>
>>> Error:
>>>
>>> Exception in thread "main" java.lang.NoClassDefFoundError:
>>> edu/stanford/nlp/pipeline/StanfordCoreNLP
>>> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.ap
>>> ply(Lemmatizer.scala:37)
>>> at transformers.ml.Lemmatizer$$anonfun$createTransformFunc$1.ap
>>> ply(Lemmatizer.scala:33)
>>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
>>> 2.apply(ScalaUDF.scala:88)
>>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$
>>> 2.apply(ScalaUDF.scala:87)
>>> at org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(Scal
>>> aUDF.scala:1060)
>>> at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedEx
>>> pressions.scala:142)
>>> at org.apache.spark.sql.catalyst.expressions.InterpretedProject
>>> ion.apply(Projection.scala:45)
>>> at org.apache.spark.sql.catalyst.expressions.InterpretedProject
>>> ion.apply(Projection.scala:29)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>> sableLike.scala:234)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
>>> sableLike.scala:234)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.s
>>> cala:234)
>>>
>>>
>>>
>>> On Sun, Sep 18, 2016 at 2:21 PM, Sujit Pal 
>>> wrote:
>>>
 Hi Janardhan,

 Maybe try removing the string "test" from this line in your build.sbt?
 IIRC, this restricts the models JAR to be called from a test.

 "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
 classifier "models",

 -sujit


 On Sun, Sep 18, 2016 at 11:01 AM, janardhan shetty <
 janardhan...@gmail.com> wrote:

> Hi,
>
> I am trying to use lemmatization as a transformer and added belwo to
> the build.sbt
>
>  "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
> "com.google.protobuf" % "protobuf-java" % "2.6.1",
> "edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" % "test"
> classifier "models",
> "org.scalatest" %% "scalatest" % "2.2.6" % "test"
>
>
> Error:
> *Exception in thread "main" java.lang.NoClassDefFoundError:
> edu/stanford/nlp/pipeline/StanfordCoreNLP*
>
> I have tried other versions of this spark package.
>
> Any help is appreciated..
>


>>>
>>
>


Fwd: Populating tables using hive and spark

2016-08-26 Thread Timur Shenkao
Hello!

I just wonder: do you (both of you) use the same user for HIVE & Spark? Or
different ? Do you use Kerberized Hadoop?



On Mon, Aug 22, 2016 at 2:20 PM, Mich Talebzadeh 
wrote:

> Ok This is my test
>
> 1) create table in Hive and populate it with two rows
>
> hive> create table testme (col1 int, col2 string);
> OK
> hive> insert into testme values (1,'London');
> Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
> OK
> hive> insert into testme values (2,'NY');
> Query ID = hduser_20160821212812_2a8384af-23f1-4f28-9395-a99a5f4c1a4a
> OK
> hive> select * from testme;
> OK
> 1   London
> 2   NY
>
> So the rows are there
>
> Now use  Spark to create two more rows
>
> scala> case class columns (col1: Int, col2: String)
> defined class columns
> scala> val df =sc.parallelize(Array((3,"California"),(4,"Dehli"))).map(p
> => columns(p._1.toString.toInt, p._2.toString)).toDF()
> df: org.apache.spark.sql.DataFrame = [col1: int, col2: string]
> scala> df.show
> ++--+
> |col1|  col2|
> ++--+
> |   3|California|
> |   4| Dehli|
> ++--+
>
> // register it as tempTable
> scala> df.registerTempTable("tmp")
> scala> sql("insert into test.testme select * from tmp")
> res9: org.apache.spark.sql.DataFrame = []
> scala> sql("select * from testme").show
> ++--+
> |col1|  col2|
> ++--+
> |   1|London|
> |   2|NY|
> |   3|California|
> |   4| Dehli|
> ++--+
> So the rows are there.
>
> Let me go to Hive again now
>
>
> hive>  select * from testme;
> OK
> 1   London
> 2   NY
> 3   California
> 4   Dehli
>
> hive> analyze table testme compute statistics for columns;
>
> So is there any issue here?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 22 August 2016 at 11:51, Nitin Kumar  wrote:
>
>> Hi Furcy,
>>
>> If I execute the command "ANALYZE TABLE TEST_ORC COMPUTE STATISTICS"
>> before checking the count from hive, Hive returns the correct count albeit
>> it does not spawn a map-reduce job for computing the count.
>>
>> I'm running a HDP 2.4 Cluster with Hive 1.2.1.2.4 and Spark 1.6.1
>>
>> If others can concur we can go ahead and report it as a bug.
>>
>> Regards,
>> Nitin
>>
>>
>>
>> On Mon, Aug 22, 2016 at 4:15 PM, Furcy Pin 
>> wrote:
>>
>>> Hi Nitin,
>>>
>>> I confirm that there is something odd here.
>>>
>>> I did the following test :
>>>
>>> create table test_orc (id int, name string, dept string) stored as ORC;
>>> insert into table test_orc values (1, 'abc', 'xyz');
>>> insert into table test_orc values (2, 'def', 'xyz');
>>> insert into table test_orc values (3, 'pqr', 'xyz');
>>> insert into table test_orc values (4, 'ghi', 'xyz');
>>>
>>>
>>> I ended up with 4 files on hdfs:
>>>
>>> 00_0
>>> 00_0_copy_1
>>> 00_0_copy_2
>>> 00_0_copy_3
>>>
>>>
>>> Then I renamed 00_0_copy_2 to part-0, and I still got COUNT(*) =
>>> 4 with hive.
>>> So this is not a file name issue.
>>>
>>> I then removed one of the files, and I got this :
>>>
>>> > SELECT COUNT(1) FROM test_orc ;
>>> +--+--+
>>> | _c0  |
>>> +--+--+
>>> | 4|
>>> +--+--+
>>>
>>> > SELECT * FROM test_orc ;
>>> +--+++--+
>>> | test_orc.id  | test_orc.name  | test_orc.dept  |
>>> +--+++--+
>>> | 1| abc| xyz|
>>> | 2| def| xyz|
>>> | 4| ghi| xyz|
>>> +--+++--+
>>> 3 rows selected (0.162 seconds)
>>>
>>> So, my guess is that when Hive inserts data, it must keep somewhere in
>>> the metastore the number of rows in the table.
>>> However, if the files are modified by someone else than Hive itself,
>>> (either manually or with Spark), you end up with an inconsistency.
>>>
>>> So I guess we can call it a bug:
>>>
>>> Hive should detect that the files changed and invalidate its
>>> pre-calculated count.
>>> Optionally, Spark should be nice with Hive and update the the count when
>>> inserting.
>>>
>>> I don't know if this bug has already been reported, and I tested on Hive
>>> 1.1.0, so perhaps it is already solved in later releases.
>>>
>>> Regards,
>>>
>>> Furcy
>>>
>>>
>>> On Mon, Aug 22, 2016 at 9:34 AM, Nitin Kumar 
>>> wrote:
>>>
 Hi!

 I've noticed that hive has problems in r

Re: NoClassDefFoundError with ZonedDateTime

2016-07-24 Thread Timur Shenkao
Which version of Java 8 do you use? AFAIK, it's recommended to exploit Java
1.8_0.66 +

On Fri, Jul 22, 2016 at 8:49 PM, Jacek Laskowski  wrote:

> On Fri, Jul 22, 2016 at 6:43 AM, Ted Yu  wrote:
> > You can use this command (assuming log aggregation is turned on):
> >
> > yarn logs --applicationId XX
>
> I don't think it's gonna work for already-running application (and I
> wish I were mistaken since I needed it just yesterday) and you have to
> revert to stderr of ApplicationMaster in container 1.
>
> Jacek
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Timur Shenkao
Hi, guys!

As far as I remember, Spark does not use all peculiarities and
optimizations of ORC. Moreover, the possibility  to read ORC files appeared
not so long time ago in Spark.

So, despite "victorious" results announced in
http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/ ,
 there is a lot of "nuisances" like
https://issues.apache.org/jira/browse/SPARK-11087 or
https://issues.apache.org/jira/browse/SPARK-14286

May be, it would be useful
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization

On Wed, Jun 29, 2016 at 1:38 AM, Mich Talebzadeh 
wrote:

> This is what I am getting in the container log for mr
>
> 2016-06-28 23:25:53,808 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS
> hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_task_tmp.-mr-10004/_tmp.00_0
> 2016-06-28 23:25:53,808 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: New Final Path: FS
> hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_tmp.-mr-10004/00_0
> 2016-06-28 23:25:53,836 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1
> 2016-06-28 23:25:53,837 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 10
> 2016-06-28 23:25:53,837 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 100
> 2016-06-28 23:25:53,844 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1000
> 2016-06-28 23:25:53,875 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1
> 2016-06-28 23:25:53,954 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 10
> 2016-06-28 23:25:55,072 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 100
> 2016-06-28 23:26:56,236 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1000
> 2016-06-28 23:27:58,499 WARN [ResponseProcessor for block
> BP-1648199869-50.140.197.217-1462266926537:blk_1074784072_1043287]
> org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fields took
> 35556ms (threshold=3ms); ack: seqno: 6815 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 35566795000, targets: [
> 50.140.197.217:50010, 50.140.197.216:50010]
> 2016-06-28 23:31:38,437 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1
> 2016-06-28 23:35:27,631 WARN [ResponseProcessor for block
> BP-1648199869-50.140.197.217-1462266926537:blk_1074784086_1043301]
> org.apache.hadoop.hdfs.DFSClient: *Slow ReadProcessor read fields took
> 31118ms (threshold=3ms);* ack: seqno: 36303 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 31128701000, targets: [
> 50.140.197.217:50010, 50.140.197.216:50010]
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 28 June 2016 at 23:27, Mich Talebzadeh 
> wrote:
>
>> That is a good point.
>>
>> The ORC table property is as follows
>>
>> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>> "orc.stripe.size"="268435456",
>> "orc.row.index.stride"="1")
>>
>> which puts each stripe at 256MB
>>
>> Just to clarify this is spark running on Hive tables. I don't think the
>> use of TEZ, MR or Spark as execution engines is going to make any
>> difference?
>>
>> This is the same query with Hive on MR
>>
>> select a.prod_id from sales2 a, sales_staging b where a.prod_id =
>> b.prod_id order by a.prod_id;
>>
>> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
>> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
>> 7.32 sec
>> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
>> 18.21 sec
>> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
>> 22.34 sec
>> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
>> 30.33 sec
>> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
>> 33.45 sec
>> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
>> 37.5 sec
>> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
>> 42.0 sec
>> 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumula

Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-05-22 Thread Timur Shenkao
Hi,
Thanks a lot for such interesting comparison. But important questions
remain / to be addressed:

1) How to make 2 versions of Spark live together on the same cluster
(libraries clash, paths, etc.) ?
Most of the Spark users perform ETL, ML operations on Spark as well. So, we
may have 3 Spark installations simultaneously

2) How stable such construction is on INSERT / UPDATE / CTAS operations?
Any problems with writing into specific tables / directories, ORC / Parquet
peculiarities, memory / timeout parameters tuning ?

3) How stable such construction is in multi-user / multi-tenant production
environment when several people make different queries simultaneously?

It's impossible to restart Spark masters, workers several tines a day, tune
it constantly.


On Mon, May 23, 2016 at 2:42 AM, Mich Talebzadeh 
wrote:

> Hi,
>
>
>
> I have done a number of extensive tests using Spark-shell with Hive DB and
> ORC tables.
>
>
>
> Now one issue that we typically face is and I quote:
>
>
>
> Spark is fast as it uses Memory and DAG. Great but when we save data it is
> not fast enough
>
> OK but there is a solution now. If you use Spark with Hive and you are on
> a descent version of Hive >= 0.14, then you can also deploy Spark as
> execution engine for Hive. That will make your application run pretty fast
> as you no longer rely on the old Map-Reduce for Hive engine. In a nutshell
> what you are gaining speed in both querying and storage.
>
>
>
> I have made some comparisons on this set-up and I am sure some of you will
> find it useful.
>
>
>
> The version of Spark I use for Spark queries (Spark as query tool) is 1.6.
>
> The version of Hive I use in Hive 2
>
> The version of Spark I use as Hive execution engine is 1.3.1 It works and
> frankly Spark 1.3.1 as an execution engine is adequate (until we sort out
> the Hadoop libraries mismatch).
>
>
>
> An example I am using Hive on Spark engine to find the min and max of IDs
> for a table with 1 billion rows:
>
>
>
> 0: jdbc:hive2://rhes564:10010/default>  select min(id), max(id),avg(id),
> stddev(id) from oraclehadoop.dummy;
>
> Query ID = hduser_20160523002031_3e22e26e-4293-4e90-ae8b-72fe9683c006
>
>
>
>
>
> Starting Spark Job = 5e092ef9-d798-4952-b156-74df49da9151
>
>
>
> INFO  : Completed compiling
> command(queryId=hduser_20160523002031_3e22e26e-4293-4e90-ae8b-72fe9683c006);
> Time taken: 1.911 seconds
>
> INFO  : Executing
> command(queryId=hduser_20160523002031_3e22e26e-4293-4e90-ae8b-72fe9683c006):
> select min(id), max(id),avg(id), stddev(id) from oraclehadoop.dummy
>
> INFO  : Query ID =
> hduser_20160523002031_3e22e26e-4293-4e90-ae8b-72fe9683c006
>
> INFO  : Total jobs = 1
>
> INFO  : Launching Job 1 out of 1
>
> INFO  : Starting task [Stage-1:MAPRED] in serial mode
>
>
>
> Query Hive on Spark job[0] stages:
>
> 0
>
> 1
>
> Status: Running (Hive on Spark job[0])
>
> Job Progress Format
>
> CurrentTime StageId_StageAttemptId:
> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
> [StageCost]
>
> 2016-05-23 00:21:19,062 Stage-0_0: 0/22 Stage-1_0: 0/1
>
> 2016-05-23 00:21:20,070 Stage-0_0: 0(+12)/22Stage-1_0: 0/1
>
> 2016-05-23 00:21:23,119 Stage-0_0: 0(+12)/22Stage-1_0: 0/1
>
> 2016-05-23 00:21:26,156 Stage-0_0: 13(+9)/22Stage-1_0: 0/1
>
> INFO  :
>
> Query Hive on Spark job[0] stages:
>
> INFO  : 0
>
> INFO  : 1
>
> INFO  :
>
> Status: Running (Hive on Spark job[0])
>
> INFO  : Job Progress Format
>
> CurrentTime StageId_StageAttemptId:
> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
> [StageCost]
>
> INFO  : 2016-05-23 00:21:19,062 Stage-0_0: 0/22 Stage-1_0: 0/1
>
> INFO  : 2016-05-23 00:21:20,070 Stage-0_0: 0(+12)/22Stage-1_0: 0/1
>
> INFO  : 2016-05-23 00:21:23,119 Stage-0_0: 0(+12)/22Stage-1_0: 0/1
>
> INFO  : 2016-05-23 00:21:26,156 Stage-0_0: 13(+9)/22Stage-1_0: 0/1
>
> 2016-05-23 00:21:29,181 Stage-0_0: 22/22 Finished   Stage-1_0: 0(+1)/1
>
> 2016-05-23 00:21:30,189 Stage-0_0: 22/22 Finished   Stage-1_0: 1/1
> Finished
>
> Status: Finished successfully in 53.25 seconds
>
> OK
>
> INFO  : 2016-05-23 00:21:29,181 Stage-0_0: 22/22 Finished   Stage-1_0:
> 0(+1)/1
>
> INFO  : 2016-05-23 00:21:30,189 Stage-0_0: 22/22 Finished   Stage-1_0:
> 1/1 Finished
>
> INFO  : Status: Finished successfully in 53.25 seconds
>
> INFO  : Completed executing
> command(queryId=hduser_20160523002031_3e22e26e-4293-4e90-ae8b-72fe9683c006);
> Time taken: 56.337 seconds
>
> INFO  : OK
>
> +-++---+---+--+
>
> | c0  | c1 |  c2   |  c3   |
>
> +-++---+---+--+
>
> | 1   | 1  | 5.0005E7  | 2.8867513459481288E7  |
>
> +-++---+---+--+
>
> 1 row selected (58.529 seconds)
>
>
>
> 58 seconds first run with cold cache is pretty good
>
>
>
> And let us compare it with running the same query on map-reduce engine
>
>
>
>

Re: This works to filter transactions older than certain months

2016-03-28 Thread Timur Shenkao
bq. CSV data is stored in an underlying table in Hive (actually created and
populated as an ORC table by Spark)

How is it possible?

On Mon, Mar 28, 2016 at 1:50 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> A while back I was looking for functional programming to filter out
> transactions older > n months etc.
>
> This turned out to be pretty easy.
>
> I get today's day as follows
>
> var today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> '-MM-dd') ").collect.apply(0).getString(0)
>
>
> CSV data is stored in an underlying table in Hive (actually created and
> populated as an ORC table by Spark)
>
> HiveContext.sql("use accounts")
> var n = HiveContext.table("nw_10124772")
>
> scala> n.printSchema
> root
>  |-- transactiondate: date (nullable = true)
>  |-- transactiontype: string (nullable = true)
>  |-- description: string (nullable = true)
>  |-- value: double (nullable = true)
>  |-- balance: double (nullable = true)
>  |-- accountname: string (nullable = true)
>  |-- accountnumber: integer (nullable = true)
>
> //
> // Check for historical transactions > 60 months old
> //
> var old: Int = 60
>
> val rs = n.filter(add_months(col("transactiondate"),old) <
> lit(today)).select(lit(today),
> col("transactiondate"),add_months(col("transactiondate"),old)).collect.foreach(println)
>
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-22,2016-03-22]
> [2016-03-27,2011-03-23,2016-03-23]
> [2016-03-27,2011-03-23,2016-03-23]
>
>
> Which seems to work. Any other suggestions will be appreciated.
>
> Thanks
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: spark 1.6.0 connect to hive metastore

2016-03-12 Thread Timur Shenkao
I had similar issue with CDH 5.5.3.
Not only with Spark 1.6 but with beeline as well.
I resolved it via installation & running hiveserver2 role instance at the
same server wher metastore is. 

On Tue, Feb 9, 2016 at 10:58 PM, Koert Kuipers  wrote:

> has anyone successfully connected to hive metastore using spark 1.6.0? i
> am having no luck. worked fine with spark 1.5.1 for me. i am on cdh 5.5 and
> launching spark with yarn.
>
> this is what i see in logs:
> 16/02/09 14:49:12 INFO hive.metastore: Trying to connect to metastore with
> URI thrift://metastore.mycompany.com:9083
> 16/02/09 14:49:12 INFO hive.metastore: Connected to metastore.
>
> and then a little later:
>
> 16/02/09 14:49:34 INFO hive.HiveContext: Initializing execution hive,
> version 1.2.1
> 16/02/09 14:49:34 INFO client.ClientWrapper: Inspected Hadoop version:
> 2.6.0-cdh5.4.4
> 16/02/09 14:49:34 INFO client.ClientWrapper: Loaded
> org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.4.4
> 16/02/09 14:49:34 WARN conf.HiveConf: HiveConf of name
> hive.server2.enable.impersonation does not exist
> 16/02/09 14:49:35 INFO metastore.HiveMetaStore: 0: Opening raw store with
> implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
> 16/02/09 14:49:35 INFO metastore.ObjectStore: ObjectStore, initialize
> called
> 16/02/09 14:49:35 INFO DataNucleus.Persistence: Property
> hive.metastore.integral.jdo.pushdown unknown - will be ignored
> 16/02/09 14:49:35 INFO DataNucleus.Persistence: Property
> datanucleus.cache.level2 unknown - will be ignored
> 16/02/09 14:49:35 WARN DataNucleus.Connection: BoneCP specified but not
> present in CLASSPATH (or one of dependencies)
> 16/02/09 14:49:35 WARN DataNucleus.Connection: BoneCP specified but not
> present in CLASSPATH (or one of dependencies)
> 16/02/09 14:49:37 WARN conf.HiveConf: HiveConf of name
> hive.server2.enable.impersonation does not exist
> 16/02/09 14:49:37 INFO metastore.ObjectStore: Setting MetaStore object pin
> classes with
> hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
> 16/02/09 14:49:38 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
> "embedded-only" so does not have its own datastore table.
> 16/02/09 14:49:38 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
> "embedded-only" so does not have its own datastore table.
> 16/02/09 14:49:40 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as
> "embedded-only" so does not have its own datastore table.
> 16/02/09 14:49:40 INFO DataNucleus.Datastore: The class
> "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as
> "embedded-only" so does not have its own datastore table.
> 16/02/09 14:49:40 INFO metastore.MetaStoreDirectSql: Using direct SQL,
> underlying DB is DERBY
> 16/02/09 14:49:40 INFO metastore.ObjectStore: Initialized ObjectStore
> java.lang.RuntimeException: java.lang.RuntimeException: Unable to
> instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
>   at
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>   at
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:194)
>   at
> org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:238)
>   at
> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:218)
>   at
> org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:208)
>   at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:440)
>   at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:272)
>   at org.apache.spark.sql.SQLContext$$anonfun$4.apply(SQLContext.scala:271)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at org.apache.spark.sql.SQLContext.(SQLContext.scala:271)
>   at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:97)
>   at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:101)
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>   at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>   at org.apache.spark.repl.Main$.createSQLContext(Main.scala:89)
>   ... 47 elided
> Caused by: java.lang.RuntimeException: Unable to instantiate
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
>   at
> org.apache.hadoop.hive.metastore.MetaStoreUtils.new

Re: Spark SQL is not returning records for HIVE transactional tables on HDP

2016-03-12 Thread Timur Shenkao
Hi,

I have suffered from Hive Streaming , Transactions enough, so I can share
my experience with you.

1) It's not a problem of Spark. It happens because of "peculiarities" /
bugs of Hive Streaming.  Hive Streaming, transactions are very raw
technologies. If you look at Hive JIRA, you'll see several critical bugs
concerning Hive Streaming, transactions. Some of them are resolved in Hive
2+ only. But Cloudera & Hortonworks ship their distributions with outdated
& buggy Hive.
So use Hive 2+. Earlier versions of Hive didn't run compaction at all.

2) In Hive 1.1, I  issue the following lines
ALTER TABLE default.foo COMPACT 'MAJOR';
SHOW COMPACTIONS;

My manual compaction was shown but it was never fulfilled.

3) If you use Hive Streaming, it's not recommended or even forbidden to
insert rows into Hive Streaming tables manually. Only the process that
writes to such table should insert incoming rows sequentially. Otherwise
you'll get unpredictable behaviour.

4) Ordinary Hive tables are catalogs with text, ORC, etc. files.
Hive Streaming / transactional tables are catalogs that have numerous
subcatalogs with "delta" prefix. Moreover, there are files with
"flush_length" suffix in some delta subfolders. "flush_length" files have 8
bytes length. The presence of "flush_length" file in some subfolder means
that Hive writes updates to this subfolder right now. When Hive fails or is
restarted, it begins to write into new delta subfolder with new
"flush_length" file. And old "flush_length" file (that was used before
failure) still remains.
One of the goal of compaction is to delete outdated "flush_length" files.
Not every application / library can read such folder structure or knows
details of Hive Streaming / transactions implementation. Most of the
software solutions still expect ordinary Hive tables as input.
When they encounter subcatalogs or special files "flush_length" file,
applications / libraries either "see nothing" (return 0 or empty result
set) or stumble over "flush_length" files (return unexplainable errors).

For instance, Facebook Presto couldn't read subfolders by default unless
you activate special parameters. But it stumbles over "flush_length" files
as Presto expect legal ORC files not 8-byte-length text files in folders.

So, I don't advise you to use Hive Streaming, transactions right now in
real production systems (24 / 7 /365) with hundreds millions of events a
day.

On Sat, Mar 12, 2016 at 11:24 AM, @Sanjiv Singh 
wrote:

> Hi All,
>
> I am facing this issue on HDP setup on which COMPACTION is required only
> once for transactional tables to fetch records with Spark SQL.
> On the other hand, Apache setup doesn't required compaction even once.
>
> May be something got triggered on meta-store after compaction, Spark SQL
> start recognizing delta files.
>
> Let know me if needed other details to get root cause.
>
> Try this,
>
> *See complete scenario :*
>
> hive> create table default.foo(id int) clustered by (id) into 2 buckets
> STORED AS ORC TBLPROPERTIES ('transactional'='true');
> hive> insert into default.foo values(10);
>
> scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
> because data is still in delta files
>
> Now run major compaction:
>
> hive> ALTER TABLE default.foo COMPACT 'MAJOR';
>
> scala> sqlContext.table("default.foo").count // Gives 1
>
> hive> insert into foo values(20);
>
> scala> sqlContext.table("default.foo").count* // Gives 2 , no compaction
> required.*
>
>
>
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>