How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-17 Thread kant kodali
Hi All, I would like to flatten JSON blobs into a Data Frame using Spark/Spark SQl inside Spark-Shell. val df = spark.sql("select body from test limit 3"); // body is a json encoded blob column val df2 = df.select(df("body").cast(StringType).as("body")) when I do df2.show // shows the 3

sort descending with multiple columns

2016-11-17 Thread Sreekanth Jella
Hi, I'm trying to sort multiple columns and column names are dynamic. df.sort(colList.head, colList.tail: _*) But I'm not sure how to sort in descending order for all columns, I tried this but it's for only first column.. df.sort(df.col(colList.head).desc) How can I pass all column names (or

Re: Configure spark.kryoserializer.buffer.max at runtime does not take effect

2016-11-17 Thread kant kodali
yeah I feel like this is a bug since you can't really modify the settings once you were given spark session or spark context. so the work around would be to use --conf. In your case it would be like this ./spark-shell --conf spark.kryoserializer.buffer.max=1g On Thu, Nov 17, 2016 at 1:59 PM,

Is selecting different datasets from same parquet file blocking.

2016-11-17 Thread Rohit Verma
Hi I have dataset which has 10 columns, created through a parquet file. I want to perform some operations on each column. I create 10 datasets as dsBig.select(col). When I submit these 10 jobs will they be blocking each other as all of them reading from same parquet file. Is selecting

GraphX Pregel not update vertex state properly, cause messages loss

2016-11-17 Thread fuz_woo
hi,everyone, I encountered a strange problem these days when i'm attempting to use the GraphX Pregel interface to implement a simple single-source-shortest-path algorithm. below is my code: import com.alibaba.fastjson.JSONObject import org.apache.spark.graphx._ import

Re: Map and MapParitions with partition-local variable

2016-11-17 Thread Rohit Verma
Using a map and mapPartition on same df at the same time doesn't make much sense to me. Also without complete infor I am assuming that you have some partition strategy being defined/influenced by map operation. In that case you can create a hashmap of map values for each partitions, do

Re: Long-running job OOMs driver process

2016-11-17 Thread Alexis Seigneurin
Hi Irina, I would question the use of multiple threads in your application. Since Spark is going to run the processing of each DataFrame on all the cores of your cluster, the processes will be competing for resources. In fact, they would not only compete for CPU cores but also for memory. Spark

Re: How to propagate R_LIBS to sparkr executors

2016-11-17 Thread Felix Cheung
Have you tried spark.executorEnv.R_LIBS? spark.apache.org/docs/latest/configuration.html#runtime-environment _ From: Rodrick Brown > Sent: Wednesday, November 16, 2016 1:01 PM Subject: How to propagate R_LIBS to

Long-running job OOMs driver process

2016-11-17 Thread Irina Truong
We have an application that reads text files, converts them to dataframes, and saves them in Parquet format. The application runs fine when processing a few files, but we have several thousand produced every day. When running the job for all files, we have spark-submit killed on OOM: # #

kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-17 Thread Hster Geguri
Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been struggling with this show stopper problem. When we run our drivers with auto.offset.reset=latest ingesting from a single kafka topic with 10 partitions, the driver reads correctly from all 10 partitions. However when we

Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-17 Thread shyla deshpande
Thanks Zhu, That was it. Now works great! On Thu, Nov 17, 2016 at 1:07 PM, Shixiong(Ryan) Zhu wrote: > The problem is "optional Gender gender = 3;". The generated class "Gender" > is a trait, and Spark cannot know how to create a trait so it's not > supported. You can

Re: Configure spark.kryoserializer.buffer.max at runtime does not take effect

2016-11-17 Thread Koert Kuipers
getOrCreate uses existing SparkSession if available, in which case the settings will be ignored On Wed, Nov 16, 2016 at 10:55 PM, bluishpenguin wrote: > Hi all, > I would like to configure the following setting during runtime as below: > > spark = (SparkSession >

Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread ayan guha
Hi I think you can use map reduce paradigm here. Create a key using user ID and date and record as a value. Then you can express your operation (do something) part as a function. If the function meets certain criteria such as associative and cumulative like, say Add or multiplication, you can

Re: HiveContext.getOrCreate not accessible

2016-11-17 Thread Shixiong(Ryan) Zhu
`SQLContext.getOrCreate` will return the HiveContext you created. On Mon, Nov 14, 2016 at 11:17 PM, Praseetha wrote: > > Hi All, > > > I have a streaming app and when i try invoking the > HiveContext.getOrCreate, it errors out with the following stmt. 'object >

Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-17 Thread Shixiong(Ryan) Zhu
The problem is "optional Gender gender = 3;". The generated class "Gender" is a trait, and Spark cannot know how to create a trait so it's not supported. You can define your class which is supported by SQL Encoder, and convert this generated class to the new class in `parseLine`. On Wed, Nov 16,

Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-17 Thread shyla deshpande
Hello everyone, The following code works ... def main(args : Array[String]) { val spark = SparkSession.builder. master("local") .appName("spark session example") .getOrCreate() import spark.implicits._ val ds1 = spark.readStream.format("kafka").

Map and MapParitions with partition-local variable

2016-11-17 Thread Zsolt Tóth
Any comment on this one? 2016. nov. 16. du. 12:59 ezt írta ("Zsolt Tóth" ): > Hi, > > I need to run a map() and a mapPartitions() on my input DF. As a > side-effect of the map(), a partition-local variable should be updated, > that is used in the mapPartitions()

Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-17 Thread Reynold Xin
Adding a new data type is an enormous undertaking and very invasive. I don't think it is worth it in this case given there are clear, simple workarounds. On Thu, Nov 17, 2016 at 12:24 PM, kant kodali wrote: > Can we have a JSONType for Spark SQL? > > On Wed, Nov 16, 2016 at

Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-17 Thread kant kodali
Can we have a JSONType for Spark SQL? On Wed, Nov 16, 2016 at 8:41 PM, Nathan Lande wrote: > If you are dealing with a bunch of different schemas in 1 field, figuring > out a strategy to deal with that will depend on your data and does not > really have anything to do

Spark Submit --> Unable to reach cluster manager to request executors

2016-11-17 Thread KhajaAsmath Mohammed
Hello Everyone, Could you please let me know if there any optimal way to request executors while submitting job in yarn mode. My job is running slow by printing out following statements. Is there a way to speed up process? 16/11/17 14:03:49 WARN spark.SparkContext: Requesting executors is only

Re: Spark AVRO S3 read not working for partitioned data

2016-11-17 Thread Jon Gregg
Making a guess here: you need to add s3:ListBucket? http://stackoverflow.com/questions/35803808/spark-saveastextfile-to-s3-fails On Thu, Nov 17, 2016 at 2:11 PM, Jain, Nishit wrote: > When I read a specific file it works: > > val filePath=

Re: does column order matter in dataframe.repartition?

2016-11-17 Thread Sean Owen
It's not in general true that 100 different partitions keys go to 100 partitions -- it depends on the partitioner, but wouldn't be true in the case of a default HashPartitioner. But, yeah you'd expect a reasonably even distribution. What happens in all cases depends on the partitioner. I haven't

Re: How to load only the data of the last partition

2016-11-17 Thread Daniel Haviv
Hi Samy, If you're working with hive you could create a partitioned table and update it's partitions' locations to the last version so when you'll query it using spark, you'll always get the latest version. Daniel On Thu, Nov 17, 2016 at 9:05 PM, Samy Dindane wrote: > Hi, > >

Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread titli batali
That would help but again in a particular partitions i would need to a iterate over the customers having first n letters of user id in that partition. I want to get rid of nested iterations. Thanks On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan wrote: > You can partitioned

does column order matter in dataframe.repartition?

2016-11-17 Thread Cesar
I am using the next line to re-partition a data frame by multiple columns: val partitionColumns = Seq("date", "company_id").map(x => new Column(x)) val numPartitions = 100 val dfRepartitioined = df.repartition(numPartitions, partitionColumns) I understand that if the number of combinations of

Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-17 Thread shyla deshpande
val spark = SparkSession.builder. master("local") .appName("spark session example") .getOrCreate() import spark.implicits._ val dframe1 = spark.readStream.format("kafka"). option("kafka.bootstrap.servers","localhost:9092"). option("subscribe","student").load() *How do I deserialize

Spark AVRO S3 read not working for partitioned data

2016-11-17 Thread Jain, Nishit
When I read a specific file it works: val filePath= "s3n://bucket_name/f1/f2/avro/dt=2016-10-19/hr=19/00" val df = spark.read.avro(filePath) But if I point to a folder to read date partitioned data it fails: val filePath="s3n://bucket_name/f1/f2/avro/dt=2016-10-19/" I get this error:

replace some partitions when writing dataframe

2016-11-17 Thread Koert Kuipers
i am looking into writing a dataframe to parquet using partioning. so something like df .write .mode(saveMode) .partitionBy(partitionColumn) .format("parquet") .save(path) i imagine i will have thousands of partitions. generally my goal is not to recreate all partitions every time, but

How to load only the data of the last partition

2016-11-17 Thread Samy Dindane
Hi, I have some data partitioned this way: /data/year=2016/month=9/version=0 /data/year=2016/month=10/version=0 /data/year=2016/month=10/version=1 /data/year=2016/month=10/version=2 /data/year=2016/month=10/version=3 /data/year=2016/month=11/version=0 /data/year=2016/month=11/version=1 When

Re: Kafka segmentation

2016-11-17 Thread Hoang Bao Thien
I am sorry I don't understand your idea. What do you mean exactly? On Fri, Nov 18, 2016 at 1:50 AM, Cody Koeninger wrote: > Ok, I don't think I'm clear on the issue then. Can you say what the > expected behavior is, and what the observed behavior is? > > On Thu, Nov 17,

Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
Ok, I don't think I'm clear on the issue then. Can you say what the expected behavior is, and what the observed behavior is? On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien wrote: > Hi, > > Thanks for your comments. But in fact, I don't want to limit the size of >

Re: Kafka segmentation

2016-11-17 Thread Hoang Bao Thien
Hi, Thanks for your comments. But in fact, I don't want to limit the size of batches, it could be any greater size as it does. Thien On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger wrote: > If you want a consistent limit on the size of batches, use >

analysing ibm mq messages using spark streaming

2016-11-17 Thread Mich Talebzadeh
hi, I guess the only way to do this is to read ibm mq messages into flume, ingest it into hdfs and read it from there. alternatively use flume to ingest data into hbase and then use spark on hbase. I don't think there is an api like spark streaming with kafka for ibm mq? thanks Dr Mich

Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
If you want a consistent limit on the size of batches, use spark.streaming.kafka.maxRatePerPartition (assuming you're using createDirectStream) http://spark.apache.org/docs/latest/configuration.html#spark-streaming On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien wrote:

Fill nan with last (good) value

2016-11-17 Thread geoHeil
How can I fill nan values with the last (good) value? For me, it would be enough to fill it with the previous value of a window function. So far I could it not get to work as my window function only returns nan values. Here is code for a minimal example:

Using mapWithState without a checkpoint

2016-11-17 Thread Daniel Haviv
Hi, Is it possible to use mapWithState without checkpointing at all ? I'd rather have the whole application fail, restart and reload an initialState RDD than pay for checkpointing every 10 batches. Thank you, Daniel

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-17 Thread Dirceu Semighini Filho
Nice, thank you I'll test this property to see if the error stop; 2016-11-17 14:48 GMT-02:00 Arijit : > Hi Dirceu, > > > For the append issue we are setting "hdfs.append.support" (from Spark code > which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed > to

Re: Grouping Set

2016-11-17 Thread Andrés Ivaldi
I'm realize that my data have null values, so that null are for the values not for the calculated grouping set, but that is another problem, how can I detect witch is one? now I have this problem I my data is just a row like this [ {1:"A",2:null, 3:123}] the grouping set (1) will give me A, null,

Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread Xiaomeng Wan
You can partitioned on the first n letters of userid On 17 November 2016 at 08:25, titli batali wrote: > Hi, > > I have a use case, where we have 1000 csv files with a column user_Id, > having 8 million unique users. The data contains: userid,date,transaction, > where we

Re: SparkILoop doesn't run

2016-11-17 Thread Holden Karau
Moving to user list So this might be a better question for the user list - but is there a reason you are trying to use the SparkILoop for tests? On Thu, Nov 17, 2016 at 5:47 PM Mohit Jaggi wrote: > > > I am trying to use SparkILoop to write some tests(shown below) but the

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-17 Thread Arijit
Hi Dirceu, For the append issue we are setting "hdfs.append.support" (from Spark code which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed to have solved the issue. Of course we are using HDFS which does support append. I think the actual configuration Spark should check

Fill na with last value

2016-11-17 Thread Georg Heiler
How can I fill nan values in spark with the last or the last good known value? Here is a minimal example http://stackoverflow.com/q/40592207/2587904 So far I tried a window function but unfortunately received only nan values. Kind regards Georg

RE: Spark SQL join and subquery

2016-11-17 Thread Sood, Anjali
unsubscribe -Original Message- From: neil90 [mailto:neilp1...@icloud.com] Sent: Thursday, November 17, 2016 8:26 AM To: user@spark.apache.org Subject: Re: Spark SQL join and subquery What version of Spark are you using? I believe this was fixed in 2.0 -- View this message in context:

Re: Spark SQL join and subquery

2016-11-17 Thread neil90
What version of Spark are you using? I believe this was fixed in 2.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-join-and-subquery-tp28093p28097.html Sent from the Apache Spark User List mailing list archive at Nabble.com.

Fwd: Spark Partitioning Strategy with Parquet

2016-11-17 Thread titli batali
Hi, I have a use case, where we have 1000 csv files with a column user_Id, having 8 million unique users. The data contains: userid,date,transaction, where we run some queries. We have a case where we need to iterate for each transaction in a particular date for each user. There is three nesting

Re: Best practice for preprocessing feature with DataFrame

2016-11-17 Thread Stuart White
Sorry. Small typo. That last part should be: val modifiedRows = rows .select( substring('age, 0, 2) as "age", when('gender === 1, "male").otherwise(when('gender === 2, "female").otherwise("unknown")) as "gender" ) modifiedRows.show +---+---+ |age| gender| +---+---+ | 90|

newAPIHadoopFile throws a JsonMappingException: Infinite recursion (StackOverflowError) error

2016-11-17 Thread David Robison
I am trying to create a new JavaPairRDD from data in an HDFS file. My code is: sparkContext = new JavaSparkContext("yarn-client", "SumFramesPerTimeUnit", sparkConf); JavaPairRDD inputRDD = sparkContext.newAPIHadoopFile(fileFilter, FixedLengthInputFormat.class,

Re: Best practice for preprocessing feature with DataFrame

2016-11-17 Thread Stuart White
import org.apache.spark.sql.functions._ val rows = Seq(("90s", 1), ("80s", 2), ("80s", 3)).toDF("age", "gender") rows.show +---+--+ |age|gender| +---+--+ |90s| 1| |80s| 2| |80s| 3| +---+--+ val modifiedRows .select( substring('age, 0, 2) as "age", when('gender

Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Hyukjin Kwon
Actually, CSV datasource supports encoding option[1] (although it does not support non-ascii compatible encoding types). [1] https://github.com/apache/spark/blob/44c8bfda793b7655e2bd1da5e9915a09ed9d42ce/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L364 On 17 Nov 2016 10:59

ClassCastException when using SparkSQL Window function

2016-11-17 Thread Isabelle Phan
Hello, I have a simple session table, which tracks pages users visited with a sessionId. I would like to apply a window function by sessionId, but am hitting a type cast exception. I am using Spark 1.5.0. Here is sample code: scala> df.printSchema root |-- sessionid: string (nullable = true)

Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Mich Talebzadeh
Thanks Ayan. That only works for extra characters like ^ characters etc. Unfortunately it does not cure specific character sets. cheers Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

outlier detection using StreamingKMeans

2016-11-17 Thread Debasish Ghosh
Hello - I am trying to implement an outlier detection application on streaming data. I am a newbie to Spark and hence would like some advice on the confusions that I have .. I am thinking of using StreamingKMeans - is this a good choice ? I have one stream of data and I need an online algorithm.

Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread ayan guha
There is an utility called dos2unix. You can give it a try On 18 Nov 2016 00:20, "Jörn Franke" wrote: > > You can do the conversion of character set (is this the issue?) as part of your loading process in Spark. > As far as i know the spark CSV package is based on Hadoop

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-17 Thread Dirceu Semighini Filho
Hi Arijit, Have you find a solution for this? I'm facing the same problem in Spark 1.6.1, but here the error happens only a few times, so our hdfs does support append. This is what I can see in the logs: 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] WriteAheadLogManager for Thread:

Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Jörn Franke
You can do the conversion of character set (is this the issue?) as part of your loading process in Spark. As far as i know the spark CSV package is based on Hadoop TextFileInputformat. This format to my best of knowledge supports only utf-8. So you have to do a conversion from windows to utf-8.

Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Mich Talebzadeh
Hi, In the past with Databricks package for csv files on occasions I had to do some cleaning at Linux directory level before ingesting CSV file into HDFS staging directory for Spark to read it. I have a more generic issue that may have to be ready. Assume that a provides using FTP to push CSV

RE: Nested UDFs

2016-11-17 Thread Mendelson, Assaf
The you probably want to create a normal function as opposed to UDF. A UDF takes your function and applies it on each element in the column one after the other. You can think of it as working on the result of a loop iterating on the column. pyspark.sql.function.regexp_replace receives a column

Re: Nested UDFs

2016-11-17 Thread Perttu Ranta-aho
Hi, My example was little bogus, my real use case is to do multiple regexp replacements so something like: def my_f(data): for match, repl in regexp_list: data = regexp_replace(match, repl, data) return data I could achieve my goal by mutiple .select(regexp_replace()) lines, but

HDPCD SPARK Certification Queries

2016-11-17 Thread Aakash Basu
Hi all, I want to know more about this examination - http://hortonworks.com/training/certification/exam-objectives/#hdpcdspark If anyone's there who appeared for the examination, can you kindly help? 1) What are the kind of questions that come, 2) Samples, 3) All the other details. Thanks,

Join Query

2016-11-17 Thread Aakash Basu
Hi, Conceptually I can understand below spark joins, when it comes to implementation I don’t find much information in Google. Please help me with code/pseudo code for below joins using java-spark or scala-spark. *Replication Join:* Given two datasets, where one is small

Re: Best practice for preprocessing feature with DataFrame

2016-11-17 Thread Yan Facai
Could you give me an example, how to use Column function? Thanks very much. On Thu, Nov 17, 2016 at 12:23 PM, Divya Gehlot wrote: > Hi, > > You can use the Column functions provided by Spark API > > https://spark.apache.org/docs/1.6.2/api/java/org/apache/ >

Another Interesting Question on SPARK SQL

2016-11-17 Thread kant kodali
​ Which parts in the diagram above are executed by DataSource connectors and which parts are executed by Tungsten? or to put it in another way which phase in the diagram above does Tungsten leverages the Datasource connectors (such as say cassandra connector ) ? My understanding so far is that

why is method predict protected in PredictionModel

2016-11-17 Thread wobu
Hi, we were using Spark 1.3.1 for a long time and now we want to upgrade to 2.0 release. So we used till today the mllib package and the RDD API. Now im trying to refactor our mllib NaiveBayesClassifier to the new "ml" api. *The Question:* why is the method "predict" in the class

Re: Kafka segmentation

2016-11-17 Thread Hoang Bao Thien
Hi, I use CSV and other text files to Kafka just to test Kafka + Spark Streaming by using direct stream. That's why I don't want Spark streaming reads CSVs or text files directly. In addition, I don't want a giant batch of records like the link you sent. The problem is that we should receive the

Re: [SQL/Catalyst] Janino Generated Code Debugging

2016-11-17 Thread Takeshi Yamamuro
Hi, not sure what you'd like to do tough, is this not enough? import org.apache.spark.sql.execution.debug._ sql("SELECT 1").debugCodegen() // maropu On Thu, Nov 17, 2016 at 3:59 AM, Aleksander Eskilson < aleksander...@gmail.com> wrote: > Hi there, > > I have some jobs generating Java code

Re: How does predicate push down really help?

2016-11-17 Thread kant kodali
Thanks for the effort and clear explanation. On Thu, Nov 17, 2016 at 12:07 AM, kant kodali wrote: > Yes thats how I understood it with your first email as well but the key > thing here sounds like some datasources may not have operators such as > filter and so on in which

Re: How does predicate push down really help?

2016-11-17 Thread kant kodali
Yes thats how I understood it with your first email as well but the key thing here sounds like some datasources may not have operators such as filter and so on in which case Spark Still needs to work and be able to apply filter operation in memory after grabbing all the rows into memory. On