Re: spark df.write.partitionBy run very slow

2019-03-14 Thread JF Chen
But now I have another question, how to determine which data node the spark
task is writing? It's really important for diving in the problem .

Regard,
Junfeng Chen


On Thu, Mar 14, 2019 at 2:26 PM Shyam P  wrote:

> cool.
>
> On Tue, Mar 12, 2019 at 9:08 AM JF Chen  wrote:
>
>> Hi
>> Finally I found the reason...
>> It caused by some long time gc on some datanodes. After receiving the
>> data from executors, the data node with long gc cannot report blocks to
>> namenode, so the writing progress takes a long time.
>> Now I have decommissioned the broken data nodes, and now my spark runs
>> well.
>> I am trying to increase the heap size of data node to check if it can
>> resolve the problem
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Fri, Mar 8, 2019 at 8:54 PM Shyam P  wrote:
>>
>>> Did you check this , how many portions and count of records it shoes ?
>>>
>>> //count by partition_id
>>> import org.apache.spark.sql.functions.spark_partition_id
>>> df.groupBy(spark_partition_id).count.show()
>>>
>>>
>>>
>>> Are you getting same number of parquet files ?
>>>
>>> You gradually increase the sample size.
>>>
>>> On Fri, 8 Mar 2019, 14:17 JF Chen,  wrote:
>>>
>>>> I check my partitionBy method again, it's partitionBy(appname, year,
>>>> month, day, hour), and the number of partitions of appname is much more
>>>> than partition of year, month, day, and hour. My spark streaming app runs
>>>> every 5 minutes, so year, month, day, and hour should be same in most of
>>>> time.
>>>> So will the number of appname pattition affect the writing efficiency?
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>>
>>>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen  wrote:
>>>>
>>>>> Yes, I agree.
>>>>>
>>>>> From the spark UI I can ensure data is not skewed. There is only about
>>>>> 100MB for each task, where most of tasks takes several seconds to write 
>>>>> the
>>>>> data to hdfs, and some tasks takes minutes of time.
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>>
>>>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P 
>>>>> wrote:
>>>>>
>>>>>> Hi JF,
>>>>>> Yes first we should know actual number of partitions dataframe has
>>>>>> and its counts of records. Accordingly we should try to have data evenly 
>>>>>> in
>>>>>> all partitions.
>>>>>> It always better to have Num of paritions = N * Num of executors.
>>>>>>
>>>>>>
>>>>>>   "But the sequence of columns in  partitionBy  decides the
>>>>>> directory  hierarchy structure. I hope the sequence of columns not 
>>>>>> change"
>>>>>> , this is correct.
>>>>>> Hence sometimes we should go with bigger number first then lesser
>>>>>>  try this ..i.e. more parent directories and less child directories.
>>>>>> Tweet around it and try.
>>>>>>
>>>>>> "some tasks in write hdfs stage cost much more time than others" may
>>>>>> be data is skewed, need to  distrube them evenly for all partitions.
>>>>>>
>>>>>> ~Shyam
>>>>>>
>>>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen  wrote:
>>>>>>
>>>>>>> Hi Shyam
>>>>>>> Thanks for your reply.
>>>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>>>> column_c, the sequence of column in partitionBy should be same to the 
>>>>>>> order
>>>>>>> of partitions number of column a, b and c?
>>>>>>> But the sequence of columns in  partitionBy  decides the
>>>>>>> directory  hierarchy structure. I hope the sequence of columns not 
>>>>>>> change.
>>>>>>>
>>>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>>>> cost much more time than others, where the amount of writing data is
>>>>>>> similar. How to solve it?
>>>>>>>
>>>>>>> Regard,
>>>>>>> Junfeng Chen
&g

Re: spark df.write.partitionBy run very slow

2019-03-11 Thread JF Chen
Hi
Finally I found the reason...
It caused by some long time gc on some datanodes. After receiving the data
from executors, the data node with long gc cannot report blocks to
namenode, so the writing progress takes a long time.
Now I have decommissioned the broken data nodes, and now my spark runs
well.
I am trying to increase the heap size of data node to check if it can
resolve the problem

Regard,
Junfeng Chen


On Fri, Mar 8, 2019 at 8:54 PM Shyam P  wrote:

> Did you check this , how many portions and count of records it shoes ?
>
> //count by partition_id
> import org.apache.spark.sql.functions.spark_partition_id
> df.groupBy(spark_partition_id).count.show()
>
>
>
> Are you getting same number of parquet files ?
>
> You gradually increase the sample size.
>
> On Fri, 8 Mar 2019, 14:17 JF Chen,  wrote:
>
>> I check my partitionBy method again, it's partitionBy(appname, year,
>> month, day, hour), and the number of partitions of appname is much more
>> than partition of year, month, day, and hour. My spark streaming app runs
>> every 5 minutes, so year, month, day, and hour should be same in most of
>> time.
>> So will the number of appname pattition affect the writing efficiency?
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Thu, Mar 7, 2019 at 4:21 PM JF Chen  wrote:
>>
>>> Yes, I agree.
>>>
>>> From the spark UI I can ensure data is not skewed. There is only about
>>> 100MB for each task, where most of tasks takes several seconds to write the
>>> data to hdfs, and some tasks takes minutes of time.
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Wed, Mar 6, 2019 at 2:39 PM Shyam P  wrote:
>>>
>>>> Hi JF,
>>>> Yes first we should know actual number of partitions dataframe has and
>>>> its counts of records. Accordingly we should try to have data evenly in all
>>>> partitions.
>>>> It always better to have Num of paritions = N * Num of executors.
>>>>
>>>>
>>>>   "But the sequence of columns in  partitionBy  decides the
>>>> directory  hierarchy structure. I hope the sequence of columns not change"
>>>> , this is correct.
>>>> Hence sometimes we should go with bigger number first then lesser 
>>>> try this ..i.e. more parent directories and less child directories. Tweet
>>>> around it and try.
>>>>
>>>> "some tasks in write hdfs stage cost much more time than others" may be
>>>> data is skewed, need to  distrube them evenly for all partitions.
>>>>
>>>> ~Shyam
>>>>
>>>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen  wrote:
>>>>
>>>>> Hi Shyam
>>>>> Thanks for your reply.
>>>>> You mean after knowing the partition number of column_a, column_b,
>>>>> column_c, the sequence of column in partitionBy should be same to the 
>>>>> order
>>>>> of partitions number of column a, b and c?
>>>>> But the sequence of columns in  partitionBy  decides the
>>>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>>>
>>>>> And I found one more strange things, some tasks in write hdfs stage
>>>>> cost much more time than others, where the amount of writing data is
>>>>> similar. How to solve it?
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>>
>>>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P 
>>>>> wrote:
>>>>>
>>>>>> Hi JF ,
>>>>>>  Try to execute it before df.write
>>>>>>
>>>>>> //count by partition_id
>>>>>> import org.apache.spark.sql.functions.spark_partition_id
>>>>>> df.groupBy(spark_partition_id).count.show()
>>>>>>
>>>>>> You will come to know how data has been partitioned inside df.
>>>>>>
>>>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>>>> column_c)
>>>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>>>> partitions) >  ( column_c  partitions) .
>>>>>>
>>>>>> Try this.
>>>>>>
>>>>>> Regards,
>>>>>> Shyam
>>>>>>
>>>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen  wrote:
>>>>>>
>>>>>>> I am trying to write data in dataset to hdfs via df.write.
>>>>>>> partitionBy(column_a, column_b, column_c).parquet(output_path)
>>>>>>> However, it costs several minutes to write only hundreds of MB data
>>>>>>> to hdfs.
>>>>>>> From this article
>>>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>>>> adding repartition method before write should work. But if there is
>>>>>>> data skew, some tasks may cost much longer time than average, which 
>>>>>>> still
>>>>>>> cost much time.
>>>>>>> How to solve this problem? Thanks in advance !
>>>>>>>
>>>>>>>
>>>>>>> Regard,
>>>>>>> Junfeng Chen
>>>>>>>
>>>>>>


Re: spark df.write.partitionBy run very slow

2019-03-08 Thread JF Chen
I check my partitionBy method again, it's partitionBy(appname, year, month,
day, hour), and the number of partitions of appname is much more than
partition of year, month, day, and hour. My spark streaming app runs every
5 minutes, so year, month, day, and hour should be same in most of time.
So will the number of appname pattition affect the writing efficiency?

Regard,
Junfeng Chen


On Thu, Mar 7, 2019 at 4:21 PM JF Chen  wrote:

> Yes, I agree.
>
> From the spark UI I can ensure data is not skewed. There is only about
> 100MB for each task, where most of tasks takes several seconds to write the
> data to hdfs, and some tasks takes minutes of time.
>
> Regard,
> Junfeng Chen
>
>
> On Wed, Mar 6, 2019 at 2:39 PM Shyam P  wrote:
>
>> Hi JF,
>> Yes first we should know actual number of partitions dataframe has and
>> its counts of records. Accordingly we should try to have data evenly in all
>> partitions.
>> It always better to have Num of paritions = N * Num of executors.
>>
>>
>>   "But the sequence of columns in  partitionBy  decides the
>> directory  hierarchy structure. I hope the sequence of columns not change"
>> , this is correct.
>> Hence sometimes we should go with bigger number first then lesser 
>> try this ..i.e. more parent directories and less child directories. Tweet
>> around it and try.
>>
>> "some tasks in write hdfs stage cost much more time than others" may be
>> data is skewed, need to  distrube them evenly for all partitions.
>>
>> ~Shyam
>>
>> On Wed, Mar 6, 2019 at 8:33 AM JF Chen  wrote:
>>
>>> Hi Shyam
>>> Thanks for your reply.
>>> You mean after knowing the partition number of column_a, column_b,
>>> column_c, the sequence of column in partitionBy should be same to the order
>>> of partitions number of column a, b and c?
>>> But the sequence of columns in  partitionBy  decides the
>>> directory  hierarchy structure. I hope the sequence of columns not change.
>>>
>>> And I found one more strange things, some tasks in write hdfs stage cost
>>> much more time than others, where the amount of writing data is similar.
>>> How to solve it?
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>>
>>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P  wrote:
>>>
>>>> Hi JF ,
>>>>  Try to execute it before df.write
>>>>
>>>> //count by partition_id
>>>> import org.apache.spark.sql.functions.spark_partition_id
>>>> df.groupBy(spark_partition_id).count.show()
>>>>
>>>> You will come to know how data has been partitioned inside df.
>>>>
>>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>>> column_c)
>>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>>> partitions) >  ( column_c  partitions) .
>>>>
>>>> Try this.
>>>>
>>>> Regards,
>>>> Shyam
>>>>
>>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen  wrote:
>>>>
>>>>> I am trying to write data in dataset to hdfs via 
>>>>> df.write.partitionBy(column_a,
>>>>> column_b, column_c).parquet(output_path)
>>>>> However, it costs several minutes to write only hundreds of MB data to
>>>>> hdfs.
>>>>> From this article
>>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>>> adding repartition method before write should work. But if there is
>>>>> data skew, some tasks may cost much longer time than average, which still
>>>>> cost much time.
>>>>> How to solve this problem? Thanks in advance !
>>>>>
>>>>>
>>>>> Regard,
>>>>> Junfeng Chen
>>>>>
>>>>


Re: spark df.write.partitionBy run very slow

2019-03-07 Thread JF Chen
Yes, I agree.

>From the spark UI I can ensure data is not skewed. There is only about
100MB for each task, where most of tasks takes several seconds to write the
data to hdfs, and some tasks takes minutes of time.

Regard,
Junfeng Chen


On Wed, Mar 6, 2019 at 2:39 PM Shyam P  wrote:

> Hi JF,
> Yes first we should know actual number of partitions dataframe has and its
> counts of records. Accordingly we should try to have data evenly in all
> partitions.
> It always better to have Num of paritions = N * Num of executors.
>
>
>   "But the sequence of columns in  partitionBy  decides the
> directory  hierarchy structure. I hope the sequence of columns not change"
> , this is correct.
> Hence sometimes we should go with bigger number first then lesser  try
> this ..i.e. more parent directories and less child directories. Tweet
> around it and try.
>
> "some tasks in write hdfs stage cost much more time than others" may be
> data is skewed, need to  distrube them evenly for all partitions.
>
> ~Shyam
>
> On Wed, Mar 6, 2019 at 8:33 AM JF Chen  wrote:
>
>> Hi Shyam
>> Thanks for your reply.
>> You mean after knowing the partition number of column_a, column_b,
>> column_c, the sequence of column in partitionBy should be same to the order
>> of partitions number of column a, b and c?
>> But the sequence of columns in  partitionBy  decides the
>> directory  hierarchy structure. I hope the sequence of columns not change.
>>
>> And I found one more strange things, some tasks in write hdfs stage cost
>> much more time than others, where the amount of writing data is similar.
>> How to solve it?
>>
>> Regard,
>> Junfeng Chen
>>
>>
>> On Tue, Mar 5, 2019 at 3:05 PM Shyam P  wrote:
>>
>>> Hi JF ,
>>>  Try to execute it before df.write
>>>
>>> //count by partition_id
>>> import org.apache.spark.sql.functions.spark_partition_id
>>> df.groupBy(spark_partition_id).count.show()
>>>
>>> You will come to know how data has been partitioned inside df.
>>>
>>> Small trick we can apply here while partitionBy(column_a, column_b,
>>> column_c)
>>> Makes sure  you should have ( column_a  partitions) > ( column_b
>>> partitions) >  ( column_c  partitions) .
>>>
>>> Try this.
>>>
>>> Regards,
>>> Shyam
>>>
>>> On Mon, Mar 4, 2019 at 4:09 PM JF Chen  wrote:
>>>
>>>> I am trying to write data in dataset to hdfs via 
>>>> df.write.partitionBy(column_a,
>>>> column_b, column_c).parquet(output_path)
>>>> However, it costs several minutes to write only hundreds of MB data to
>>>> hdfs.
>>>> From this article
>>>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>>>> adding repartition method before write should work. But if there is
>>>> data skew, some tasks may cost much longer time than average, which still
>>>> cost much time.
>>>> How to solve this problem? Thanks in advance !
>>>>
>>>>
>>>> Regard,
>>>> Junfeng Chen
>>>>
>>>


Re: "java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-06 Thread JF Chen
Hi
The max bytes setting should be enough, because if the tasks fail, it read
the data from kafka very fast as normal.
The   request.timeout.ms  I set is 180 seconds.
I think it should be time out setting or max  bandwidth setting because of
the reason that it recoveries and read the same partition very fast after
the tasks are marked failed.

Regard,
Junfeng Chen


On Wed, Mar 6, 2019 at 4:01 PM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Sorry message sent as incomplete.
>
> To better debug the issue, please check the below config properties:
>
>- At Kafka consumer properties
>   - max.partition.fetch.bytes within spark kafka consumer. If not set
>   for consumer then the global config at broker level.
>   - request.timeout.ms
>- At spark's configurations
>   - spark.streaming.kafka.consumer.poll.ms
>   - spark.network.timeout (If the above is not set, then poll.ms is
>   default to spark.network.timeout)
>
>
> Generally I have faced this issue if spark.streaming.kafka.
> consumer.poll.ms is less than request.timeout.ms
>
> Also, what is the average kafka record message size in bytes?
>
>
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Wed, Mar 6, 2019 at 1:26 PM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi,
>>
>> To better debug the issue, please check the below config properties:
>>
>>- max.partition.fetch.bytes within spark kafka consumer. If not set
>>for consumer then the global config at broker level.
>>- spark.streaming.kafka.consumer.poll.ms
>>   - spark.network.timeout (If the above is not set, then poll.ms is
>>   default to spark.network.timeout)
>>-
>>-
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>>
>> On Wed, Mar 6, 2019 at 8:39 AM JF Chen  wrote:
>>
>>> When my kafka executor reads data from kafka, sometimes it throws the
>>> error "java.lang.AssertionError: assertion failed: Failed to get records
>>> for  after polling for 18" , which after 3 minutes of executing.
>>> The data waiting for read is not so huge, which is about 1GB. And other
>>> partitions read by other tasks are very fast, the error always occurs on
>>> some specific executor..
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>


"java.lang.AssertionError: assertion failed: Failed to get records for **** after polling for 180000" error

2019-03-05 Thread JF Chen
When my kafka executor reads data from kafka, sometimes it throws the error
"java.lang.AssertionError: assertion failed: Failed to get records for 
after polling for 18" , which after 3 minutes of executing.
The data waiting for read is not so huge, which is about 1GB. And other
partitions read by other tasks are very fast, the error always occurs on
some specific executor..

Regard,
Junfeng Chen


Re: spark df.write.partitionBy run very slow

2019-03-05 Thread JF Chen
Hi Shyam
Thanks for your reply.
You mean after knowing the partition number of column_a, column_b,
column_c, the sequence of column in partitionBy should be same to the order
of partitions number of column a, b and c?
But the sequence of columns in  partitionBy  decides the
directory  hierarchy structure. I hope the sequence of columns not change.

And I found one more strange things, some tasks in write hdfs stage cost
much more time than others, where the amount of writing data is similar.
How to solve it?

Regard,
Junfeng Chen


On Tue, Mar 5, 2019 at 3:05 PM Shyam P  wrote:

> Hi JF ,
>  Try to execute it before df.write
>
> //count by partition_id
> import org.apache.spark.sql.functions.spark_partition_id
> df.groupBy(spark_partition_id).count.show()
>
> You will come to know how data has been partitioned inside df.
>
> Small trick we can apply here while partitionBy(column_a, column_b,
> column_c)
> Makes sure  you should have ( column_a  partitions) > ( column_b
> partitions) >  ( column_c  partitions) .
>
> Try this.
>
> Regards,
> Shyam
>
> On Mon, Mar 4, 2019 at 4:09 PM JF Chen  wrote:
>
>> I am trying to write data in dataset to hdfs via 
>> df.write.partitionBy(column_a,
>> column_b, column_c).parquet(output_path)
>> However, it costs several minutes to write only hundreds of MB data to
>> hdfs.
>> From this article
>> <https://stackoverflow.com/questions/45269658/spark-df-write-partitionby-run-very-slow>,
>> adding repartition method before write should work. But if there is data
>> skew, some tasks may cost much longer time than average, which still cost
>> much time.
>> How to solve this problem? Thanks in advance !
>>
>>
>> Regard,
>> Junfeng Chen
>>
>


spark df.write.partitionBy run very slow

2019-03-04 Thread JF Chen
I am trying to write data in dataset to hdfs via df.write.partitionBy(column_a,
column_b, column_c).parquet(output_path)
However, it costs several minutes to write only hundreds of MB data to
hdfs.
>From this article
,
adding repartition method before write should work. But if there is data
skew, some tasks may cost much longer time than average, which still cost
much time.
How to solve this problem? Thanks in advance !


Regard,
Junfeng Chen


Re: Back pressure not working on streaming

2019-01-01 Thread JF Chen
yes, 10 is a very low value for testing initial rate.
And from this article
https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang/,
it seems spark back pressure is not available for dstream?
So ,max rate per partition is the only available back pressure solution for
kafka dstream input?

Regard,
Junfeng Chen


On Wed, Jan 2, 2019 at 11:49 AM HARSH TAKKAR  wrote:

> There is separate property for max rate , by default is is not set, so if
> you want to limit the max rate you should  provide that property  a value.
>
> Initial rate =10 means it will pick only 10 records per receiver in the
> batch interval when you start the process.
>
> Depending  upon the consumption rate it will increase  the consumption of
> records for processing in each batch.
>
> However i, feel 10 is way to low number for 32 partitioned kafka topic.
>
>
>
> Regards
> Harsh
> Happy New Year
>
> On Wed 2 Jan, 2019, 08:33 JF Chen 
>> I have set  spark.streaming.backpressure.enabled to true,  
>> spark.streaming.backpressure.initialRate
>> to 10.
>> Once my application started, it received 32 million messages on first
>> batch.
>> My application runs every 300 seconds, with 32 kafka partition. So what's
>> is the max rate if I set initial rate to 10?
>>
>> Thanks!
>>
>>
>> Regard,
>> Junfeng Chen
>>
>


Back pressure not working on streaming

2019-01-01 Thread JF Chen
I have set  spark.streaming.backpressure.enabled to true,
spark.streaming.backpressure.initialRate
to 10.
Once my application started, it received 32 million messages on first
batch.
My application runs every 300 seconds, with 32 kafka partition. So what's
is the max rate if I set initial rate to 10?

Thanks!


Regard,
Junfeng Chen


How to set Spark Streaming batch start time?

2018-12-11 Thread JF Chen
Hi everyone
I set 10 minutes as streaming interval, but it always runs at 0th minute,
10th minute, 20 minute in every hour. Can I set a start time, like start
delay, making it runs at 5th minute, 15 minute, 25 minute every hour?
Thanks!

Regard,
Junfeng Chen


Re: how to change temp directory when spark write data ?

2018-12-05 Thread JF Chen
Directory to use for "scratch" space in Spark, including map output files
and RDDs that get stored on disk.
It seems a directory on my local disk. But currently the temp directory is
on hdfs, under the path of  df.write.parquet(path)

Regard,
Junfeng Chen


On Wed, Dec 5, 2018 at 6:28 PM Sandip Mehta 
wrote:

> tryspark.local.dir property.
>
>
> On Wed, Dec 5, 2018 at 1:42 PM JF Chen  wrote:
>
>> I have two spark apps writing data to one directory. I notice they share
>> one temp directory, and the spark fist finish writing will clear the temp
>> directory and the slower one may throw "No lease on *** File does not
>> exist" error
>> So how to specify the temp directory?
>> Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>


how to change temp directory when spark write data ?

2018-12-05 Thread JF Chen
I have two spark apps writing data to one directory. I notice they share
one temp directory, and the spark fist finish writing will clear the temp
directory and the slower one may throw "No lease on *** File does not
exist" error
So how to specify the temp directory?
Thanks!

Regard,
Junfeng Chen


"failed to get records for spark-executor after polling for ***" error

2018-12-03 Thread JF Chen
Some kafka consumer tasks throw "failed to get records for spark-executor
after polling for ***" error some times. In detail, some tasks take very
long time, and throw this error.  However while the task restarts, it
recovers very soon.
My spark version is 2.2.0


Regard,
Junfeng Chen


spark unsupported conversion to Stringtype error

2018-11-27 Thread JF Chen
When I load some parquet files and output them with show() or csv or json,
it always throw this exception: "unsupported conversion to: Stringtype"
What does this mean?
And how to resolve this?



Regard,
Junfeng Chen


Re: How to increase the parallelism of Spark Streaming application?

2018-11-08 Thread JF Chen
Yes, now I have allocated 100 cores and 8 kafka partitions, and then
repartition it to 100 to feed 100 cores. In following stage I have map
action, will it also cause slow down?

Regard,
Junfeng Chen


On Thu, Nov 8, 2018 at 12:34 AM Shahbaz  wrote:

> Hi ,
>
>- Do you have adequate CPU cores allocated to handle increased
>partitions ,generally if you have Kafka partitions >=(greater than or equal
>to) CPU Cores Total (Number of Executor Instances * Per Executor Core)
>,gives increased task parallelism for reader phase.
>- However if you have too many partitions but not enough cores ,it
>would eventually slow down the reader (Ex: 100 Partitions and only 20 Total
>Cores).
>- Additionally ,the next set of transformation will have there own
>partitions ,if its involving  shuffle ,sq.shuffle.partitions then defines
>next level of parallelism ,if you are not having any data skew,then you
>should get good performance.
>
>
> Regards,
> Shahbaz
>
> On Wed, Nov 7, 2018 at 12:58 PM JF Chen  wrote:
>
>> I have a Spark Streaming application which reads data from kafka and save
>> the the transformation result to hdfs.
>> My original partition number of kafka topic is 8, and repartition the
>> data to 100 to increase the parallelism of spark job.
>> Now I am wondering if I increase the kafka partition number to 100
>> instead of setting repartition to 100, will the performance be enhanced? (I
>> know repartition action cost a lot cpu resource)
>> If I set the kafka partition number to 100, does it have any negative
>> efficiency?
>> I just have one production environment so it's not convenient for me to
>> do the test
>>
>> Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>


[no subject]

2018-11-08 Thread JF Chen
I am working on a spark streaming application, and I want it to read
configuration from mongodb every hour, where the batch interval is 10
minutes.
Is it practicable? As I know spark streaming batch are related to the
Dstream, how to implement this function which seems not related to dstream
data?


Regard,
Junfeng Chen


Re: How to increase the parallelism of Spark Streaming application?

2018-11-08 Thread JF Chen
Hi,
I have test it on my production environment, and I find a strange thing.
After I set the kafka partition to 100, some tasks are executed very fast,
but some are slow. The slow ones cost double time than fast ones(from event
timeline). However, I have checked the consumer offsets, the data amount
for each task should be similar, so it should be no unbalance problem.
Any one have some good idea?

Regard,
Junfeng Chen


On Thu, Nov 8, 2018 at 12:34 AM Shahbaz  wrote:

> Hi ,
>
>- Do you have adequate CPU cores allocated to handle increased
>partitions ,generally if you have Kafka partitions >=(greater than or equal
>to) CPU Cores Total (Number of Executor Instances * Per Executor Core)
>,gives increased task parallelism for reader phase.
>- However if you have too many partitions but not enough cores ,it
>would eventually slow down the reader (Ex: 100 Partitions and only 20 Total
>Cores).
>- Additionally ,the next set of transformation will have there own
>partitions ,if its involving  shuffle ,sq.shuffle.partitions then defines
>next level of parallelism ,if you are not having any data skew,then you
>should get good performance.
>
>
> Regards,
> Shahbaz
>
> On Wed, Nov 7, 2018 at 12:58 PM JF Chen  wrote:
>
>> I have a Spark Streaming application which reads data from kafka and save
>> the the transformation result to hdfs.
>> My original partition number of kafka topic is 8, and repartition the
>> data to 100 to increase the parallelism of spark job.
>> Now I am wondering if I increase the kafka partition number to 100
>> instead of setting repartition to 100, will the performance be enhanced? (I
>> know repartition action cost a lot cpu resource)
>> If I set the kafka partition number to 100, does it have any negative
>> efficiency?
>> I just have one production environment so it's not convenient for me to
>> do the test
>>
>> Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>


Re: How to increase the parallelism of Spark Streaming application?

2018-11-08 Thread JF Chen
Memory is not a big problem for me... SO  no any other bad effect?

Regard,
Junfeng Chen


On Wed, Nov 7, 2018 at 4:51 PM Michael Shtelma  wrote:

> If you configure to many Kafka partitions, you can run into memory issues.
> This will increase memory requirements for spark job a lot.
>
> Best,
> Michael
>
>
> On Wed, Nov 7, 2018 at 8:28 AM JF Chen  wrote:
>
>> I have a Spark Streaming application which reads data from kafka and save
>> the the transformation result to hdfs.
>> My original partition number of kafka topic is 8, and repartition the
>> data to 100 to increase the parallelism of spark job.
>> Now I am wondering if I increase the kafka partition number to 100
>> instead of setting repartition to 100, will the performance be enhanced? (I
>> know repartition action cost a lot cpu resource)
>> If I set the kafka partition number to 100, does it have any negative
>> efficiency?
>> I just have one production environment so it's not convenient for me to
>> do the test
>>
>> Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>


How to increase the parallelism of Spark Streaming application?

2018-11-06 Thread JF Chen
I have a Spark Streaming application which reads data from kafka and save
the the transformation result to hdfs.
My original partition number of kafka topic is 8, and repartition the data
to 100 to increase the parallelism of spark job.
Now I am wondering if I increase the kafka partition number to 100 instead
of setting repartition to 100, will the performance be enhanced? (I know
repartition action cost a lot cpu resource)
If I set the kafka partition number to 100, does it have any negative
efficiency?
I just have one production environment so it's not convenient for me to do
the test

Thanks!

Regard,
Junfeng Chen


Re: How to deal with context dependent computing?

2018-08-26 Thread JF Chen
Thanks Sonal.
For example, I have data as following:
login 2018/8/27 10:00
logout 2018/8/27 10:05
login 2018/8/27 10:08
logout 2018/8/27 10:15
login 2018/8/27 11:08
logout 2018/8/27 11:32

Now I want to calculate the time between each login and logout. For
example, I should get 5 min, 7 min, 24 min from the above sample data.
I know I can calculate it with foreach, but it seems all data running on
spark driver node rather than multi executors.
So any good way to solve this problem? Thanks!

Regard,
Junfeng Chen


On Thu, Aug 23, 2018 at 6:15 PM Sonal Goyal  wrote:

> Hi Junfeng,
>
> Can you please show by means of an example what you are trying to achieve?
>
> Thanks,
> Sonal
> Nube Technologies <http://www.nubetech.co>
>
> <http://in.linkedin.com/in/sonalgoyal>
>
>
>
> On Thu, Aug 23, 2018 at 8:22 AM, JF Chen  wrote:
>
>> For example, I have some data with timstamp marked as category A and B,
>> and ordered by time. Now I want to calculate each duration from A to B. In
>> normal program, I can use the  flag bit to record the preview data if it is
>> A or B, and then calculate the duration. But in Spark Dataframe, how to do
>> it?
>>
>> Thanks!
>>
>> Regard,
>> Junfeng Chen
>>
>
>


How to deal with context dependent computing?

2018-08-22 Thread JF Chen
For example, I have some data with timstamp marked as category A and B, and
ordered by time. Now I want to calculate each duration from A to B. In
normal program, I can use the  flag bit to record the preview data if it is
A or B, and then calculate the duration. But in Spark Dataframe, how to do
it?

Thanks!

Regard,
Junfeng Chen


Name error when writing data as orc

2018-05-28 Thread JF Chen
I am working on writing a dataset to orc format to hdfs, while I meet
the following problem:

Error: name expected at the position 1473 of
'string:boolean:string:string..zone:struct<$ref:string> ...' but '$'
is found.

where the position 1473 is at "$ref:string" place.




Regard,
Junfeng Chen


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
Thanks ayan,

Also I have tried this method, the most tricky thing is that dataframe
union method must be based on same structure schema, while on my files, the
schema is variable.


Regard,
Junfeng Chen

On Tue, May 22, 2018 at 10:33 AM, ayan guha <guha.a...@gmail.com> wrote:

> A relatively naive solution will be:
>
> 0. Create a dummy blank dataframe
> 1. Loop through the list of paths.
> 2. Try to create the dataframe from the path. If success then union it
> cumulatively.
> 3. If error, just ignore it or handle as you wish.
>
> At the end of the loop, just use the unioned df. This should not have any
> additional performance overhead as declaring dataframes and union is not
> expensive, unless you call any action within the loop.
>
> Best
> Ayan
>
> On Tue, 22 May 2018 at 11:27 am, JF Chen <darou...@gmail.com> wrote:
>
>> Thanks, Thakrar,
>>
>> I have tried to check the existence of path before read it, but HDFSCli
>> python package seems not support wildcard.  "FileSystem.globStatus" is a
>> java api while I am using python via livy Do you know any python api
>> implementing the same function?
>>
>>
>> Regard,
>> Junfeng Chen
>>
>> On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
>> jthak...@conversantmedia.com> wrote:
>>
>>> Probably you can do some preprocessing/checking of the paths before you
>>> attempt to read it via Spark.
>>>
>>> Whether it is local or hdfs filesystem, you can try to check for
>>> existence and other details by using the "FileSystem.globStatus" method
>>> from the Hadoop API.
>>>
>>>
>>>
>>> *From: *JF Chen <darou...@gmail.com>
>>> *Date: *Sunday, May 20, 2018 at 10:30 PM
>>> *To: *user <user@spark.apache.org>
>>> *Subject: *How to skip nonexistent file when read files with spark?
>>>
>>>
>>>
>>> Hi Everyone
>>>
>>> I meet a tricky problem recently. I am trying to read some file paths
>>> generated by other method. The file paths are represented by wild card in
>>> list, like [ '/data/*/12', '/data/*/13']
>>>
>>> But in practice, if the wildcard cannot match any existed path, it will
>>> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
>>> exist: ...'", and the program stops after that.
>>>
>>> Actually I want spark can just ignore and skip these nonexistent  file
>>> path, and continues to run. I have tried python HDFSCli api to check the
>>> existence of path , but hdfs cli cannot support wildcard.
>>>
>>>
>>>
>>> Any good idea to solve my problem? Thanks~
>>>
>>>
>>>
>>> Regard,
>>> Junfeng Chen
>>>
>>
>> --
> Best Regards,
> Ayan Guha
>


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
Thanks Thakrar~


Regard,
Junfeng Chen

On Tue, May 22, 2018 at 11:22 AM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Junfeng,
>
>
>
> I would suggest preprocessing/validating the paths in plain Python (and
> not Spark) before you try to fetch data.
>
> I am not familiar with Python Hadoop libraries, but see if this helps -
> http://crs4.github.io/pydoop/tutorial/hdfs_api.html
>
>
>
> Best,
>
> Jayesh
>
>
>
> *From: *JF Chen <darou...@gmail.com>
> *Date: *Monday, May 21, 2018 at 10:20 PM
> *To: *ayan guha <guha.a...@gmail.com>
> *Cc: *"Thakrar, Jayesh" <jthak...@conversantmedia.com>, user <
> user@spark.apache.org>
> *Subject: *Re: How to skip nonexistent file when read files with spark?
>
>
>
> Thanks ayan,
>
>
>
> Also I have tried this method, the most tricky thing is that dataframe
> union method must be based on same structure schema, while on my files, the
> schema is variable.
>
>
>
>
> Regard,
> Junfeng Chen
>
>
>
> On Tue, May 22, 2018 at 10:33 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> A relatively naive solution will be:
>
>
>
> 0. Create a dummy blank dataframe
>
> 1. Loop through the list of paths.
>
> 2. Try to create the dataframe from the path. If success then union it
> cumulatively.
>
> 3. If error, just ignore it or handle as you wish.
>
>
>
> At the end of the loop, just use the unioned df. This should not have any
> additional performance overhead as declaring dataframes and union is not
> expensive, unless you call any action within the loop.
>
>
>
> Best
>
> Ayan
>
>
>
> On Tue, 22 May 2018 at 11:27 am, JF Chen <darou...@gmail.com> wrote:
>
> Thanks, Thakrar,
>
>
>
> I have tried to check the existence of path before read it, but HDFSCli
> python package seems not support wildcard.  "FileSystem.globStatus" is a
> java api while I am using python via livy Do you know any python api
> implementing the same function?
>
>
>
>
> Regard,
> Junfeng Chen
>
>
>
> On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> Probably you can do some preprocessing/checking of the paths before you
> attempt to read it via Spark.
>
> Whether it is local or hdfs filesystem, you can try to check for existence
> and other details by using the "FileSystem.globStatus" method from the
> Hadoop API.
>
>
>
> *From: *JF Chen <darou...@gmail.com>
> *Date: *Sunday, May 20, 2018 at 10:30 PM
> *To: *user <user@spark.apache.org>
> *Subject: *How to skip nonexistent file when read files with spark?
>
>
>
> Hi Everyone
>
> I meet a tricky problem recently. I am trying to read some file paths
> generated by other method. The file paths are represented by wild card in
> list, like [ '/data/*/12', '/data/*/13']
>
> But in practice, if the wildcard cannot match any existed path, it will
> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
> exist: ...'", and the program stops after that.
>
> Actually I want spark can just ignore and skip these nonexistent  file
> path, and continues to run. I have tried python HDFSCli api to check the
> existence of path , but hdfs cli cannot support wildcard.
>
>
>
> Any good idea to solve my problem? Thanks~
>
>
>
> Regard,
> Junfeng Chen
>
>
>
> --
>
> Best Regards,
> Ayan Guha
>
>
>


Re: How to skip nonexistent file when read files with spark?

2018-05-21 Thread JF Chen
Thanks, Thakrar,

I have tried to check the existence of path before read it, but HDFSCli
python package seems not support wildcard.  "FileSystem.globStatus" is a
java api while I am using python via livy Do you know any python api
implementing the same function?


Regard,
Junfeng Chen

On Mon, May 21, 2018 at 9:01 PM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Probably you can do some preprocessing/checking of the paths before you
> attempt to read it via Spark.
>
> Whether it is local or hdfs filesystem, you can try to check for existence
> and other details by using the "FileSystem.globStatus" method from the
> Hadoop API.
>
>
>
> *From: *JF Chen <darou...@gmail.com>
> *Date: *Sunday, May 20, 2018 at 10:30 PM
> *To: *user <user@spark.apache.org>
> *Subject: *How to skip nonexistent file when read files with spark?
>
>
>
> Hi Everyone
>
> I meet a tricky problem recently. I am trying to read some file paths
> generated by other method. The file paths are represented by wild card in
> list, like [ '/data/*/12', '/data/*/13']
>
> But in practice, if the wildcard cannot match any existed path, it will
> throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
> exist: ...'", and the program stops after that.
>
> Actually I want spark can just ignore and skip these nonexistent  file
> path, and continues to run. I have tried python HDFSCli api to check the
> existence of path , but hdfs cli cannot support wildcard.
>
>
>
> Any good idea to solve my problem? Thanks~
>
>
>
> Regard,
> Junfeng Chen
>


How to skip nonexistent file when read files with spark?

2018-05-20 Thread JF Chen
Hi Everyone
I meet a tricky problem recently. I am trying to read some file paths
generated by other method. The file paths are represented by wild card in
list, like [ '/data/*/12', '/data/*/13']
But in practice, if the wildcard cannot match any existed path, it will
throw an exception:"pyspark.sql.utils.AnalysisException: 'Path does not
exist: ...'", and the program stops after that.
Actually I want spark can just ignore and skip these nonexistent  file
path, and continues to run. I have tried python HDFSCli api to check the
existence of path , but hdfs cli cannot support wildcard.

Any good idea to solve my problem? Thanks~

Regard,
Junfeng Chen


Re: Snappy file compatible problem with spark

2018-05-17 Thread JF Chen
Yes. The JSON files compressed by Flume or Spark work well with Spark. But
the json files compressed by myself cannot be read by spark due to codec
problem. It seems sparking can read files compressed by hadoop snappy(
https://code.google.com/archive/p/hadoop-snappy/) only


Regard,
Junfeng Chen

On Thu, May 17, 2018 at 5:47 PM, Victor Noagbodji <
vnoagbo...@amplify-nation.com> wrote:

> Hey, Sorry if I misunderstood. Are you feeding the compressed JSON file to
> Spark directly?
>
> On May 17, 2018, at 4:59 AM, JF Chen <darou...@gmail.com> wrote:
>
> I made some snappy compressed json file with normal snappy codec(
> https://github.com/xerial/snappy-java ) , which seems cannot be read by
> Spark correctly.
> So how to make existed snappy file recognized by spark? Any tools to
> convert them?
>
> Thanks@!
>
> Regard,
> Junfeng Chen
>
>
>


Snappy file compatible problem with spark

2018-05-17 Thread JF Chen
I made some snappy compressed json file with normal snappy codec(
https://github.com/xerial/snappy-java ) , which seems cannot be read by
Spark correctly.
So how to make existed snappy file recognized by spark? Any tools to
convert them?

Thanks@!

Regard,
Junfeng Chen


Spark streaming with kafka input stuck in (Re-)joing group because of group rebalancing

2018-05-15 Thread JF Chen
When I terminate a spark streaming application and restart it, it always
stuck in this step:
>
> Revoking previously assigned partitions [] for group [mygroup]
> (Re-)joing group [mygroup]


If I use a new group id, even though it works fine, I may lose the data
from the last time I read the previous group id.

So how to solve it?


Regard,
Junfeng Chen


SteamingContext cannot started

2018-04-26 Thread JF Chen
I create a SparkStreamingContext and SparkSession in my application.

When I start the StreamingContext, the log will print "StreamingContext
started". However, sometimes it will not print this log, and the batch job
seems not be launched.




Regard,
Junfeng Chen