Re: spark df.write.partitionBy run very slow

2019-03-14 Thread JF Chen
But now I have another question, how to determine which data node the spark task is writing? It's really important for diving in the problem . Regard, Junfeng Chen On Thu, Mar 14, 2019 at 2:26 PM Shyam P wrote: > cool. > > On Tue, Mar 12, 2019 at 9:08 AM JF Chen wrote: > >&g

Re: spark df.write.partitionBy run very slow

2019-03-11 Thread JF Chen
import org.apache.spark.sql.functions.spark_partition_id > df.groupBy(spark_partition_id).count.show() > > > > Are you getting same number of parquet files ? > > You gradually increase the sample size. > > On Fri, 8 Mar 2019, 14:17 JF Chen, wrote: > >> I che

Re: spark df.write.partitionBy run very slow

2019-03-08 Thread JF Chen
will the number of appname pattition affect the writing efficiency? Regard, Junfeng Chen On Thu, Mar 7, 2019 at 4:21 PM JF Chen wrote: > Yes, I agree. > > From the spark UI I can ensure data is not skewed. There is only about > 100MB for each task, where most of tasks takes sev

Re: spark df.write.partitionBy run very slow

2019-03-07 Thread JF Chen
ound it and try. > > "some tasks in write hdfs stage cost much more time than others" may be > data is skewed, need to distrube them evenly for all partitions. > > ~Shyam > > On Wed, Mar 6, 2019 at 8:33 AM JF Chen wrote: > >> Hi Shyam >> Thanks

Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-06 Thread JF Chen
> >>- max.partition.fetch.bytes within spark kafka consumer. If not set >>for consumer then the global config at broker level. >>- spark.streaming.kafka.consumer.poll.ms >> - spark.network.timeout (If the above is not set, then poll.ms is >> default

"java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-05 Thread JF Chen
When my kafka executor reads data from kafka, sometimes it throws the error "java.lang.AssertionError: assertion failed: Failed to get records for after polling for 18" , which after 3 minutes of executing. The data waiting for read is not so huge, which is about 1GB. And other partitions

Re: spark df.write.partitionBy run very slow

2019-03-05 Thread JF Chen
trick we can apply here while partitionBy(column_a, column_b, > column_c) > Makes sure you should have ( column_a partitions) > ( column_b > partitions) > ( column_c partitions) . > > Try this. > > Regards, > Shyam > > On Mon, Mar 4, 2019 at 4:09 PM JF

spark df.write.partitionBy run very slow

2019-03-04 Thread JF Chen
I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a, column_b, column_c).parquet(output_path) However, it costs several minutes to write only hundreds of MB data to hdfs. >From this article

Re: Back pressure not working on streaming

2019-01-01 Thread JF Chen
d kafka topic. > > > > Regards > Harsh > Happy New Year > > On Wed 2 Jan, 2019, 08:33 JF Chen >> I have set spark.streaming.backpressure.enabled to true, >> spark.streaming.backpressure.initialRate >> to 10. >> Once my application started, it re

Back pressure not working on streaming

2019-01-01 Thread JF Chen
I have set spark.streaming.backpressure.enabled to true, spark.streaming.backpressure.initialRate to 10. Once my application started, it received 32 million messages on first batch. My application runs every 300 seconds, with 32 kafka partition. So what's is the max rate if I set initial rate to

How to set Spark Streaming batch start time?

2018-12-11 Thread JF Chen
Hi everyone I set 10 minutes as streaming interval, but it always runs at 0th minute, 10th minute, 20 minute in every hour. Can I set a start time, like start delay, making it runs at 5th minute, 15 minute, 25 minute every hour? Thanks! Regard, Junfeng Chen

Re: how to change temp directory when spark write data ?

2018-12-05 Thread JF Chen
PM Sandip Mehta wrote: > tryspark.local.dir property. > > > On Wed, Dec 5, 2018 at 1:42 PM JF Chen wrote: > >> I have two spark apps writing data to one directory. I notice they share >> one temp directory, and the spark fist finish writing will clear the temp >

how to change temp directory when spark write data ?

2018-12-05 Thread JF Chen
I have two spark apps writing data to one directory. I notice they share one temp directory, and the spark fist finish writing will clear the temp directory and the slower one may throw "No lease on *** File does not exist" error So how to specify the temp directory? Thanks! Regard, Junfeng Chen

"failed to get records for spark-executor after polling for ***" error

2018-12-03 Thread JF Chen
Some kafka consumer tasks throw "failed to get records for spark-executor after polling for ***" error some times. In detail, some tasks take very long time, and throw this error. However while the task restarts, it recovers very soon. My spark version is 2.2.0 Regard, Junfeng Chen

spark unsupported conversion to Stringtype error

2018-11-27 Thread JF Chen
When I load some parquet files and output them with show() or csv or json, it always throw this exception: "unsupported conversion to: Stringtype" What does this mean? And how to resolve this? Regard, Junfeng Chen

Re: How to increase the parallelism of Spark Streaming application?

2018-11-08 Thread JF Chen
t;next level of parallelism ,if you are not having any data skew,then you >should get good performance. > > > Regards, > Shahbaz > > On Wed, Nov 7, 2018 at 12:58 PM JF Chen wrote: > >> I have a Spark Streaming application which reads data from kafka and sa

[no subject]

2018-11-08 Thread JF Chen
I am working on a spark streaming application, and I want it to read configuration from mongodb every hour, where the batch interval is 10 minutes. Is it practicable? As I know spark streaming batch are related to the Dstream, how to implement this function which seems not related to dstream data?

Re: How to increase the parallelism of Spark Streaming application?

2018-11-08 Thread JF Chen
en you >should get good performance. > > > Regards, > Shahbaz > > On Wed, Nov 7, 2018 at 12:58 PM JF Chen wrote: > >> I have a Spark Streaming application which reads data from kafka and save >> the the transformation result to hdfs. >> My original par

Re: How to increase the parallelism of Spark Streaming application?

2018-11-08 Thread JF Chen
t; > Best, > Michael > > > On Wed, Nov 7, 2018 at 8:28 AM JF Chen wrote: > >> I have a Spark Streaming application which reads data from kafka and save >> the the transformation result to hdfs. >> My original partition number of kafka topic is 8, and repartition t

How to increase the parallelism of Spark Streaming application?

2018-11-06 Thread JF Chen
I have a Spark Streaming application which reads data from kafka and save the the transformation result to hdfs. My original partition number of kafka topic is 8, and repartition the data to 100 to increase the parallelism of spark job. Now I am wondering if I increase the kafka partition number

Re: How to deal with context dependent computing?

2018-08-26 Thread JF Chen
; Can you please show by means of an example what you are trying to achieve? > > Thanks, > Sonal > Nube Technologies <http://www.nubetech.co> > > <http://in.linkedin.com/in/sonalgoyal> > > > > On Thu, Aug 23, 2018 at 8:22 AM, JF Chen wrote: > >> Fo

How to deal with context dependent computing?

2018-08-22 Thread JF Chen
For example, I have some data with timstamp marked as category A and B, and ordered by time. Now I want to calculate each duration from A to B. In normal program, I can use the flag bit to record the preview data if it is A or B, and then calculate the duration. But in Spark Dataframe, how to do

Name error when writing data as orc

2018-05-28 Thread JF Chen
I am working on writing a dataset to orc format to hdfs, while I meet the following problem: Error: name expected at the position 1473 of 'string:boolean:string:string..zone:struct<$ref:string> ...' but '$' is found. where the position 1473 is at "$ref:string" place. Regard, Junfeng Chen

Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
the loop, just use the unioned df. This should not have any > additional performance overhead as declaring dataframes and union is not > expensive, unless you call any action within the loop. > > Best > Ayan > > On Tue, 22 May 2018 at 11:27 am, JF Chen <darou...@gmail.com> wrote: >

Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
I am not familiar with Python Hadoop libraries, but see if this helps - > http://crs4.github.io/pydoop/tutorial/hdfs_api.html > > > > Best, > > Jayesh > > > > *From: *JF Chen <darou...@gmail.com> > *Date: *Monday, May 21, 2018 at 10:20 PM

Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
> and other details by using the "FileSystem.globStatus" method from the > Hadoop API. > > > > *From: *JF Chen <darou...@gmail.com> > *Date: *Sunday, May 20, 2018 at 10:30 PM > *To: *user <user@spark.apache.org> > *Subject: *How to skip nonexistent fil

How to skip nonexistent file when read files with spark?

2018-05-20 Thread JF Chen
Hi Everyone I meet a tricky problem recently. I am trying to read some file paths generated by other method. The file paths are represented by wild card in list, like [ '/data/*/12', '/data/*/13'] But in practice, if the wildcard cannot match any existed path, it will throw an

Re: Snappy file compatible problem with spark

2018-05-17 Thread JF Chen
On Thu, May 17, 2018 at 5:47 PM, Victor Noagbodji < vnoagbo...@amplify-nation.com> wrote: > Hey, Sorry if I misunderstood. Are you feeding the compressed JSON file to > Spark directly? > > On May 17, 2018, at 4:59 AM, JF Chen <darou...@gmail.com> wrote: > > I made so

Snappy file compatible problem with spark

2018-05-17 Thread JF Chen
I made some snappy compressed json file with normal snappy codec( https://github.com/xerial/snappy-java ) , which seems cannot be read by Spark correctly. So how to make existed snappy file recognized by spark? Any tools to convert them? Thanks@! Regard, Junfeng Chen

Spark streaming with kafka input stuck in (Re-)joing group because of group rebalancing

2018-05-15 Thread JF Chen
When I terminate a spark streaming application and restart it, it always stuck in this step: > > Revoking previously assigned partitions [] for group [mygroup] > (Re-)joing group [mygroup] If I use a new group id, even though it works fine, I may lose the data from the last time I read the

SteamingContext cannot started

2018-04-26 Thread JF Chen
I create a SparkStreamingContext and SparkSession in my application. When I start the StreamingContext, the log will print "StreamingContext started". However, sometimes it will not print this log, and the batch job seems not be launched. Regard, Junfeng Chen