Re: How to generate seeded random numbers in GraphX Pregel API vertex procedure?

2016-06-03 Thread Takeshi Yamamuro
Hi, yea, we have no simple way to do that in GraphX because the GraphX class has both vertex and edge rdds and we cannot simply implement mapPartitions there to keep vertex/edge semantics inside. Another idea is to generate edge files by using RDD#mapPartitions and write them into HDFS, and then

Re: Custom positioning/partitioning Dataframes

2016-06-03 Thread Takeshi Yamamuro
Hi, I'm afraid spark has no explicit api to set custom partitioners in df for now. // maropu On Sat, Jun 4, 2016 at 1:09 AM, Nilesh Chakraborty wrote: > Hi, > > I have a domain-specific schema (RDF data with vertical partitioning, ie. > one table per property) and I want

Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Takeshi Yamamuro
Hi, you can control this kinda issue in the comming v2.0. See https://www.mail-archive.com/user@spark.apache.org/msg51603.html // maropu On Sat, Jun 4, 2016 at 10:23 AM, Silvio Fiorito < silvio.fior...@granturing.com> wrote: > Hi Saif! > > > > When you say this happens with spark-csv, are the

Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Silvio Fiorito
Hi Saif! When you say this happens with spark-csv, are the files gzipped by any chance? GZip is non-splittable so if you’re seeing skew simply from loading data it could be you have some extremely large gzip files. So for a single stage job you will have those tasks lagging compared to the

Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread Mich Talebzadeh
I set this up with the application name as follows: // Checkpoint directory val hdfsDir = "hdfs://rhes564:9000/user/hduser/checkpoint/"+ *this.getClass.getSimpleName.trim* And then use that directory for checkpointing ssc.checkpoint(hdfsDir) It creates it OK as follows with the

Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread Mich Talebzadeh
sure I am trying to use SparkContext.setCheckpointDir(directory: String) to set it up. I agree that once one start creating subdirectory like "~/checkpoints/${APPLICATION_NAME}/${USERNAME}!" it becomes a bit messy cheers Dr Mich Talebzadeh LinkedIn *

RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread David Newberger
Hi Mich, My gut says you are correct that each application should have its own checkpoint directory. Though honestly I’m a bit fuzzy on checkpointing still as I’ve not worked with it much yet. Cheers, David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Friday, June

Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread Mich Talebzadeh
Hi David yes they do The first streaming job does val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") And the twitter does /** Returns the HDFS URL */ def getCheckpointDirectory(): String = { try { val name : String = Seq("bash", "-c", "curl -s

RE: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread David Newberger
I was going to ask if you had 2 jobs running. If the checkpointing for both are setup to look at the same location I could see an error like this happening. Do both spark jobs have a reference to a checkpointing dir? David Newberger From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]

Re: Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread Mich Talebzadeh
OK I was running two spark streaming jobs, one using streaming data from Kafka and another from twitter in local mode on the same node. It is possible that the directory /user/hduser/checkpoint/temp is shared by both spark streaming jobs any experience on this please? Thanks Dr Mich

Twitter streaming error : No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist.

2016-06-03 Thread Mich Talebzadeh
Hi, Just started seeing these errors: 16/06/03 20:30:01 ERROR DFSClient: Failed to close inode 806125 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/hduser/checkpoint/temp (inode 806125): File does not exist. [Lease.

Re: Spark SQL Nested Array of JSON with empty field

2016-06-03 Thread Christian Hellström
If that's your JSON file, then the first problem is that it's incorrectly formatted. Apart from that you can just read the JSON into a DataFrame with sqlContext.read.json() and then select directly on the DataFrame without having to register a temporary table: jsonDF.select("firstname",

RE: Spark Streaming - long garbage collection time

2016-06-03 Thread David Newberger
Have you tried UseG1GC in place of UseConcMarkSweepGC? This article really helped me with GC a few short weeks ago https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html David Newberger -Original Message- From: Marco1982

Spark SQL Nested Array of JSON with empty field

2016-06-03 Thread Jerry Wong
Hi, I met a problem of empty field in the nested JSON file with Spark SQL. For instance, There are two lines of JSON file as follows, { "firstname": "Jack", "lastname": "Nelson", "address": { "state": "New York", "city": "New York" } }{ "firstname": "Landy", "middlename": "Ken", "lastname":

RE: Strategies for propery load-balanced partitioning

2016-06-03 Thread Saif.A.Ellafi
Appreciate the follow up. I am not entirely sure how or why my question is related to bucketization capabilities. It indeeds sounds like a powerful feature to avoid shuffling, but in my case, I am referring to straight forward processes of reading data and writing to parquet. If bucket tables

Spark Streaming - long garbage collection time

2016-06-03 Thread Marco1982
Hi all, I'm running a Spark Streaming application with 1-hour batches to join two data feeds and write the output to disk. The total size of one data feed is about 40 GB per hour (split in multiple files), while the size of the second data feed is about 600-800 MB per hour (also split in multiple

TrackStateByKey operation for Python

2016-06-03 Thread cmbendre
Hi, I need to build streaming application with Spark in Python since my codebase is in python. UpdateStateBykey is not able to scale with the data that i have. I got to know that the new api "trackStateByKey" is very efficient, but it is only available for Scala. Is there any way i can use

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
Mich, Yes, it is already partitioned. In Hive, I can do this: INSERT OVERWRITE amo_bi_events PARTITION (dt) SELECT event_type, timestamp, …, concat(substring(timestamp, 1, 10), ' ', substring(timestamp, 12, 2), ':00:00') AS dt FROM amo_raw_events WHERE to_date(timestamp_iso) BETWEEN

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Mich Talebzadeh
OK fine but dt is the column used for partitioning the table. This is what I get in Hive itself use test; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; drop table if exists amo_bi_events; CREATE EXTERNAL TABLE `amo_bi_events`( `event_type` string

Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Ovidiu-Cristian MARCU
I suppose you are running on 1.6. I guess you need some solution based on [1], [2] features which are coming in 2.0. [1] https://issues.apache.org/jira/browse/SPARK-12538 / https://issues.apache.org/jira/browse/SPARK-12394

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
Mich, I am using .withColumn to add another column “dt” that is a reformatted version of an existing column “timestamp”. The partitioned by column is “dt”. We are using Spark 1.6.0 in CDH 5.7.0. Thanks, Ben > On Jun 3, 2016, at 10:33 AM, Mich Talebzadeh > wrote: >

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Mich Talebzadeh
what version of spark are you using Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw * http://talebzadehmich.wordpress.com On 3 June 2016 at

Re: Scheduler Delay Time

2016-06-03 Thread Ted Yu
Mind using a different site for your images ? I clicked on each of the 3 links but none of them shows up. FYI On Fri, Jun 3, 2016 at 9:36 AM, alvarobrandon wrote: > Hello: > > I'm doing some instrumentation in Spark and I've realised that some of my > tasks take

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Mich Talebzadeh
ok what is the new column is called? you are basically adding a new column to an already existing table Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

Scheduler Delay Time

2016-06-03 Thread alvarobrandon
Hello: I'm doing some instrumentation in Spark and I've realised that some of my tasks take really long times to complete because the Scheduler Delay Time. I submit the apps through spark-submit in a YARN cluster. I was wondering if this Delay time takes also into account the period between an

RE: About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread David Newberger
Alonso, I could totally be misunderstanding something or missing a piece of the puzzle however remove .setMaster. If you do that it will run with whatever the CDH VM is setup for which in the out of the box default case is YARN and Client. val sparkConf = new SparkConf().setAppName(“Some App

Custom positioning/partitioning Dataframes

2016-06-03 Thread Nilesh Chakraborty
Hi, I have a domain-specific schema (RDF data with vertical partitioning, ie. one table per property) and I want to instruct SparkSQL to keep semantically closer property tables closer together, that is, group dataframes together into different nodes (or at least encourage it somehow) so that

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
The table already exists. CREATE EXTERNAL TABLE `amo_bi_events`( `event_type` string COMMENT '', `timestamp` string COMMENT '', `event_valid` int COMMENT

Re: Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Mich Talebzadeh
hang on are you saving this as a new table? Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw * http://talebzadehmich.wordpress.com On 3 June

Re: java.io.FileNotFoundException

2016-06-03 Thread kishore kumar
Hi Jeff Zhang, Thanks for response, could you explain me why this error occurs ? On Fri, Jun 3, 2016 at 6:15 PM, Jeff Zhang wrote: > One quick solution is to use spark 1.6.1. > > On Fri, Jun 3, 2016 at 8:35 PM, kishore kumar > wrote: > >> Could anyone

Re: About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread Alonso Isidoro Roman
Thank you David, so, i would have to change the way that i am creating SparkConf object, isn't? I can see in this link that the way to run a spark job using YARN is using this

Strategies for propery load-balanced partitioning

2016-06-03 Thread Saif.A.Ellafi
Hello everyone! I was noticing that, when reading parquet files or actually any kind of source data frame data (spark-csv, etc), default partinioning is not fair. Action tasks usually act very fast on some partitions and very slow on some others, and frequently, even fast on all but last

JavaDStream to Dataframe: Java

2016-06-03 Thread Zakaria Hili
Hi, I m newbie in spark and I want to ask you a simple question. I have an JavaDStream which contains data selected from sql database. something like (id, user, score ...) and I want to convert the JavaDStream to a dataframe . how can I do this with java ? Thank you ᐧ

Spark Streaming w/variables used as dynamic queries

2016-06-03 Thread Cyril Scetbon
Hey guys, Can someone help me to solve the current issue. My code is the following : var arr = new ArrayBuffer[String]() sa_msgs.map(x => x._1) .foreachRDD { rdd => arr = new ArrayBuffer[String]() } (2) sa_msgs.map(x => x._1) .foreachRDD { rdd => arr ++=

Re: np.unique and collect

2016-06-03 Thread Ted Yu
Where is np defined ? Thanks On Fri, Jun 3, 2016 at 6:07 AM, pseudo oduesp wrote: > Hi , > why np.unique return list instead of list in this function ? > > def unique_item_df(df,list_var): > > l = df.select(list_var).distinct().collect() > return np.unique(l) >

RE: [REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-03 Thread David Newberger
What does your processing time look like. Is it consistently within that 20sec micro batch window? David Newberger From: Adrian Tanase [mailto:atan...@adobe.com] Sent: Friday, June 3, 2016 8:14 AM To: user@spark.apache.org Cc: Cosmin Ciobanu Subject: [REPOST] Severe Spark Streaming performance

[REPOST] Severe Spark Streaming performance degradation after upgrading to 1.6.1

2016-06-03 Thread Adrian Tanase
Hi all, Trying to repost this question from a colleague on my team, somehow his subscription is not active: http://apache-spark-user-list.1001560.n3.nabble.com/Severe-Spark-Streaming-performance-degradation-after-upgrading-to-1-6-1-td27056.html Appreciate any thoughts, -adrian

Save to a Partitioned Table using a Derived Column

2016-06-03 Thread Benjamin Kim
Does anyone know how to save data in a DataFrame to a table partitioned using an existing column reformatted into a derived column? val partitionedDf = df.withColumn("dt", concat(substring($"timestamp", 1, 10), lit(" "), substring($"timestamp", 12, 2), lit(":00")))

np.unique and collect

2016-06-03 Thread pseudo oduesp
Hi , why np.unique return list instead of list in this function ? def unique_item_df(df,list_var): l = df.select(list_var).distinct().collect() return np.unique(l) df it s data frmae and list it lits of variables . (pyspark) code thanks .

RE: About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread David Newberger
Alonso, The CDH VM uses YARN and the default deploy mode is client. I’ve been able to use the CDH VM for many learning scenarios. http://www.cloudera.com/documentation/enterprise/latest.html http://www.cloudera.com/documentation/enterprise/latest/topics/spark.html David Newberger From:

Re: java.io.FileNotFoundException

2016-06-03 Thread Jeff Zhang
One quick solution is to use spark 1.6.1. On Fri, Jun 3, 2016 at 8:35 PM, kishore kumar wrote: > Could anyone help me on this issue ? > > On Tue, May 31, 2016 at 8:00 PM, kishore kumar > wrote: > >> Hi, >> >> We installed spark1.2.1 in single node,

Re: java.io.FileNotFoundException

2016-06-03 Thread kishore kumar
Could anyone help me on this issue ? On Tue, May 31, 2016 at 8:00 PM, kishore kumar wrote: > Hi, > > We installed spark1.2.1 in single node, running a job in yarn-client mode > on yarn which loads data into hbase and elasticsearch, > > the error which we are encountering

Re: how to increase threads per executor

2016-06-03 Thread Mich Talebzadeh
The general way passing parameters to spark-submit are as follows (note that I use a generic shell script to submit jobs). Replace ${JAR_FILE} with appropriate values. In general you can pass all these driver-memory, executor-memory to shell script as variables if you wish without hard coding them

About a problem running a spark job in a cdh-5.7.0 vmware image.

2016-06-03 Thread Alonso
Hi, i am developing a project that needs to use kafka, spark-streaming and spark-mllib, this is the github project . I am using a vmware cdh-5.7-0 image, with 4 cores and 8 GB of ram, the file that i want to use is only 16

Re: Re: Classpath hell and Elasticsearch 2.3.2...

2016-06-03 Thread Costin Leau
Hi, Sorry to hear about your troubles. Not sure whether you are aware of the ES-Hadoop docs [1]. I've raised an issue [2] to better clarify the usage of elasticsearch-hadoop vs elasticsearch-spark jars. Apologies for the delayed response, for ES-Hadoop questions/issues it's best to use the

Re: twitter data analysis

2016-06-03 Thread Mich Talebzadeh
thanks Jorn. Is the data stored in hdfs directory in binary format and can spark use it or needs to convert into json etc. I am not familiar with the nature of the twitter logs. in short what tool I can use to convert the log files into useful format and format would that be? thanks Dr Mich

Re: twitter data analysis

2016-06-03 Thread Jörn Franke
Or combine both! It is possible with Spark Streaming to combine streaming data and on HDFS. In the end it always depends what you want to do and when you need what. > On 03 Jun 2016, at 10:26, Mich Talebzadeh wrote: > > I use twitter data with spark streaming to

twitter data analysis

2016-06-03 Thread Mich Talebzadeh
I use twitter data with spark streaming to experiment with twitter data. Basic stuff val ssc = new StreamingContext(sparkConf, Seconds(2)) val tweets = TwitterUtils.createStream(ssc, None) val statuses = tweets.map(status => status.getText()) statuses.print() Another alternative

Re: how to increase threads per executor

2016-06-03 Thread Jacek Laskowski
--executor-cores 1 to be exact. Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark http://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Fri, Jun 3, 2016 at 12:28 AM, Mich Talebzadeh wrote: >

How to share cached tables when the Thrift server runs in multi-session mode in spark 1.6

2016-06-03 Thread 谭成灶
HI I created a cached table through Session A via beeline, through which I am able to access data. I tried to access this cached table from another session, but I cannot find it. Got the solution from spark site itself: From Spark 1.6, by default the Thrift server runs in