spark streaming executor number still increase

2017-09-12 Thread zhan8610189
I use CDH spark(1.5.0-hadoop2.6.0) cluster, and write one spark streaming application, and start spark streaming using following command: spark-submit --master spark://:7077 --conf spark.cores.max=4 --num-executors 4 --total-executor-cores 4 --executor-cores 4 --executor-memory 2g --class

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
It seems current_timestamp() cannot be used directly in window function? because after attempts I found that using *df.count.withColumn("pTime", current_timestamp).select(window($"pTime", "15 minutes"), $"count")* instead of *df.count.withColumn("window", window(current_timestamp(), "15

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count 张万新 于2017年9月13日周三 上午11:32写道: > *Yes, my code is shown below* > /** > * input > */ > val logs = spark > .readStream > .format("kafka") >

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread 张万新
*Yes, my code is shown below(I also post my code in another mail)* /** * input */ val logs = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", BROKER_SERVER) .option("subscribe", TOPIC) .option("startingOffset", "latest") .load() /** *

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
*Yes, my code is shown below* /** * input */ val logs = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", BROKER_SERVER) .option("subscribe", TOPIC) .option("startingOffset", "latest") .load() /** * process */ val logValues =

Re: Chaining Spark Streaming Jobs

2017-09-12 Thread Sunita Arvind
Hi Michael, I am wondering what I am doing wrong. I get error like: Exception in thread "main" java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able

Re: Multiple Sources found for csv

2017-09-12 Thread jeff saremi
sorry just found this which answers my question: https://stackoverflow.com/questions/41726340/spark-2-0-csv-error [https://cdn.sstatic.net/Sites/stackoverflow/img/apple-touch-i...@2.png?v=73d79a89bded] Spark 2.0 CSV

Multiple Sources found for csv

2017-09-12 Thread jeff saremi
I have this line which works in the spark interactive console but it fails in Intellij Using Spark 2.1.1 in both cases: Exception in thread "main" java.lang.RuntimeException: Multiple sources found for csv (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat,

Re: Continue reading dataframe from file despite errors

2017-09-12 Thread jeff saremi
thanks Suresh. it worked nicely From: Suresh Thalamati Sent: Tuesday, September 12, 2017 2:59:29 PM To: jeff saremi Cc: user@spark.apache.org Subject: Re: Continue reading dataframe from file despite errors Try the CSV

Re: Continue reading dataframe from file despite errors

2017-09-12 Thread Suresh Thalamati
Try the CSV Option(“mode”, "dropmalformed”), that might skip the error records. > On Sep 12, 2017, at 2:33 PM, jeff saremi wrote: > > should have added some of the exception to be clear: > > 17/09/12 14:14:17 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1

Configuration for unit testing and sql.shuffle.partitions

2017-09-12 Thread peay
Hello, I am running unit tests with Spark DataFrames, and I am looking for configuration tweaks that would make tests faster. Usually, I use a local[2] or local[4] master. Something that has been bothering me is that most of my stages end up using 200 partitions, independently of whether I

Re: Continue reading dataframe from file despite errors

2017-09-12 Thread jeff saremi
should have added some of the exception to be clear: 17/09/12 14:14:17 ERROR TaskSetManager: Task 0 in stage 15.0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage

Continue reading dataframe from file despite errors

2017-09-12 Thread jeff saremi
I'm using a statement like the following to load my dataframe from some text file Upon encountering the first error, the whole thing throws an exception and processing stops. I'd like to continue loading even if that results in zero rows in my dataframe. How can i do that? thanks

Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-12 Thread kant kodali
I have about 100 fields in my dataset and some of them have "null" in it. Does to_json fails to convert if that is the case? Thanks! On Tue, Sep 12, 2017 at 12:32 PM, kant kodali wrote: > Hi Michael, > > Interestingly that doesn't seem to quite work for me for some reason.

Re: Queries with streaming sources must be executed with writeStream.start()

2017-09-12 Thread kant kodali
Hi Michael, Interestingly that doesn't seem to quite work for me for some reason. Here is what I have Datset name | id | country - kant | 1 | usa john | 2 | usa And here is my code Dataset ds = getKafkaStream(); // This dataset represents the one above

How can I Upgrade Spark 1.6 to 2.x in Cloudera QuickStart VM 5.7

2017-09-12 Thread Gaurav1809
Hi All, I am using Cloudera 5.7 QuickStart VM for learning purpose. It has Spark 1.6 I want to upgrade Spark to 2.x. How can I do it? I dong think it will be as easy as downloading 2.x and replace the older files. Please guide if anyone has done this in past. Steps would be highly helpful.

How do I create a JIRA issue and associate it with a PR that I created for a bug in master?

2017-09-12 Thread Mikhailau, Alex
How do I create a JIRA issue and associate it with a PR that I created for a bug in master? https://github.com/apache/spark/pull/19210

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread Michael Armbrust
Can you show all the code? This works for me. On Tue, Sep 12, 2017 at 12:05 AM, 张万新 wrote: > The spark version is 2.2.0 > > Michael Armbrust 于2017年9月12日周二 下午12:32写道: > >> Which version of spark? >> >> On Mon, Sep 11, 2017 at 8:27 PM, 张万新

Re: [SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread Michael Armbrust
Can you show the full query you are running? On Tue, Sep 12, 2017 at 10:11 AM, 张万新 wrote: > Hi, > > I'm using structured streaming to count unique visits of our website. I > use spark on yarn mode with 4 executor instances and from 2 cores * 5g > memory to 4 cores * 10g

[SS] Any way to optimize memory consumption of SS?

2017-09-12 Thread 张万新
Hi, I'm using structured streaming to count unique visits of our website. I use spark on yarn mode with 4 executor instances and from 2 cores * 5g memory to 4 cores * 10g memory for each executor, but there are frequent full gc, and once the count raises to about more than 4.5 millions the

How to run "merge into" ACID transaction hive query using hive java api?

2017-09-12 Thread Hokam Singh Chauhan
Please share if any one know how to execute "merge into" hive query. Thanks, Hokam

Re: Why do checkpoints work the way they do?

2017-09-12 Thread Hugo Reinwald
Thanks Tathagata for the clarification. +1 on documenting limitations of checkpoints on structured streaming. Hugo On Mon, Sep 11, 2017 at 7:13 PM, Dmitry Naumenko wrote: > +1 for me for this question. If there any constraints in restoring > checkpoint for Structured

Re: How does spark work?

2017-09-12 Thread Jules Damji
Alternatively, watch Spark Summit talk on Memory Management to get insight from a developer's perspective. https://spark-summit.org/2016/events/deep-dive-apache-spark-memory-management/ https://spark-summit.org/2017/events/a-developers-view-into-sparks-memory-model/ Cheers Jules Sent from

How to set Map values in spark/scala

2017-09-12 Thread Paras Bansal
Hello, I am new to spark-scala development. I am trying to create map values in spark using scala but getting nothing printed def createMap() : Map[String, Int] = {var tMap:Map[String, Int] = Map()val tDF = spark.sql("select a, b, c from temp")for (x <- tDF) { val k = x.getAs[Long](0) + "|" +

Re: How does spark work?

2017-09-12 Thread nguyen duc Tuan
In genernal, RDD, which is the central concept of Spark, is just deffinition of how to get data and process data. Each partition of RDD defines how to get/process each partition of data. A series of transformation will transform every partitions of data from previous RDD. I give you very easy

RE: ClassNotFoundException while unmarshalling a remote RDD on Spark 1.5.1

2017-09-12 Thread PICARD Damien
Ok, it just seems to be an issue with the syntax of the spark-submit command. It should be : spark-submit --queue default \ --class com.my.Launcher \ --deploy-mode cluster \ --master yarn-cluster \ --driver-java-options "-Dfile.encoding=UTF-8" \ --jars

Re: Efficient Spark-Submit planning

2017-09-12 Thread Sonal Goyal
Overall the defaults are sensible, but you definitely have to look at your application and optimise a few of them. I mostly refer to the following links when the job is slow or failing or we have more hardware which we see we are not utilizing. http://spark.apache.org/docs/latest/tuning.html

Re: Spark ignores --master local[*]

2017-09-12 Thread Vikash Pareek
Your VM might not be having more than 1 core available to run spark job. Check with *nproc* command to see how many cores available on VM and *top *command to see how many cores are free. - __Vikash Pareek -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Re: How does spark work?

2017-09-12 Thread Vikash Pareek
Obviously, you can't store 900GB of data into 80GB memory. There is a concept in spark called disk spill, it means when your data size increases and can't fit into memory then it spilled out to disk. Also, spark doesn't use whole memory for storing the data, some fraction of memory used for

Re: Multiple Kafka topics processing in Spark 2.2

2017-09-12 Thread kant kodali
@Dan shouldn't you be using Dataset/Dataframes ? I heard it is recommended to use Dataset and Dataframes than using Dstreams since Dstreams is in maintenance mode. On Mon, Sep 11, 2017 at 7:41 AM, Cody Koeninger wrote: > If you want an "easy" but not particularly performant

Re: [SS]How to add a column with custom system time?

2017-09-12 Thread 张万新
The spark version is 2.2.0 Michael Armbrust 于2017年9月12日周二 下午12:32写道: > Which version of spark? > > On Mon, Sep 11, 2017 at 8:27 PM, 张万新 wrote: > >> Thanks for reply, but using this method I got an exception: >> >> "Exception in thread "main" >>

Spark Yarn Java Out Of Memory on Complex Query Execution Plan

2017-09-12 Thread Nimmi Cv
Exception in thread "main" java.lang.OutOfMemoryError at java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161) at java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155) at

Spark Yarn Java Out Of Memory on Complex Query Execution Plan

2017-09-12 Thread nimmi.cv
Exception in thread "main" java.lang.OutOfMemoryError at java.lang.AbstractStringBuilder.hugeCapacity(AbstractStringBuilder.java:161) at java.lang.AbstractStringBuilder.newCapacity(AbstractStringBuilder.java:155) at

Unable to save an RDd on S3 with SSE-KMS encryption

2017-09-12 Thread Vikash Pareek
I am trying to save an rdd on S3 with server side encryption using KMS key (SSE-KMS), But I am getting the following exception: *Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 695E32175EBA568A, AWS Error