Re: _spark_metadata path issue with S3 lifecycle policy

2023-04-13 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Yeah but can’t you use following?1 . For data files: My/path/part-2. For partitioned data: my/path/partition=Best regardsOn 13 Apr 2023, at 12:58, Yuval Itzchakov  wrote:The problem is that specifying two lifecycle policies for the same path, the one with the shorter retention wins :(https://docs.aws.amazon.com/AmazonS3/latest/userguide/lifecycle-configuration-examples.html#lifecycle-config-conceptual-ex4"You might specify an S3 Lifecycle configuration in which you specify overlapping prefixes, or actions.Generally, S3 Lifecycle optimizes for cost. For example, if two expiration policies overlap, the shorter expiration policy is honored so that data is not stored for longer than expected. Likewise, if two transition policies overlap, S3 Lifecycle transitions your objects to the lower-cost storage class."On Thu, Apr 13, 2023, 12:29 "Yuri Oleynikov (‫יורי אולייניקוב‬‎)" <yur...@gmail.com> wrote:My naïve  assumption that specifying lifecycle policy for _spark_metadata with longer retention will solve the issue

Best regards

> On 13 Apr 2023, at 11:52, Yuval Itzchakov <yuva...@gmail.com> wrote:
> 
> 
> Hi everyone,
> 
> I am using Sparks FileStreamSink in order to write files to S3. On the S3 bucket, I have a lifecycle policy that deletes data older than X days back from the bucket in order for it to not infinitely grow. My problem starts with Spark jobs that don't have frequent data. What will happen in this case is that new batches will not be created, which in turn means no new checkpoints will be written to the output path and no overwrites to the _spark_metadata file will be performed, thus eventually causing the  lifecycle policy to delete the file which causes the job to fail.
> 
> As far as I can tell from reading the code and looking at StackOverflow answers, _spark_metadata file path is hardcoded to the base path of the output directory created by the DataStreamWriter, which means I cannot store this file in a separate prefix which is not under the lifecycle policy rule.
> 
> Has anyone run into a similar problem?
> 
> 
> 
> -- 
> Best Regards,
> Yuval Itzchakov.



Re: _spark_metadata path issue with S3 lifecycle policy

2023-04-13 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
My naïve  assumption that specifying lifecycle policy for _spark_metadata with 
longer retention will solve the issue

Best regards

> On 13 Apr 2023, at 11:52, Yuval Itzchakov  wrote:
> 
> 
> Hi everyone,
> 
> I am using Sparks FileStreamSink in order to write files to S3. On the S3 
> bucket, I have a lifecycle policy that deletes data older than X days back 
> from the bucket in order for it to not infinitely grow. My problem starts 
> with Spark jobs that don't have frequent data. What will happen in this case 
> is that new batches will not be created, which in turn means no new 
> checkpoints will be written to the output path and no overwrites to the 
> _spark_metadata file will be performed, thus eventually causing the  
> lifecycle policy to delete the file which causes the job to fail.
> 
> As far as I can tell from reading the code and looking at StackOverflow 
> answers, _spark_metadata file path is hardcoded to the base path of the 
> output directory created by the DataStreamWriter, which means I cannot store 
> this file in a separate prefix which is not under the lifecycle policy rule.
> 
> Has anyone run into a similar problem?
> 
> 
> 
> -- 
> Best Regards,
> Yuval Itzchakov.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Data ingestion

2022-08-17 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
If you are on aws, you can use RDS + AWS DMS to save data to s3 and then read 
streaming data with spark structured streaming from s3 into hive

Best regards

> On 17 Aug 2022, at 20:51, Akash Vellukai  wrote:
> 
> 
> Dear Sir, 
> 
> 
> How we could do data ingestion from MySQL to Hive with the help of Spark 
> streaming and not with Kafka
> 
> Thanks and regards
> Akash

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: When should we cache / persist ? After or Before Actions?

2022-04-21 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Hi Sean 

Persisting/caching is useful when you’re going to reuse dataframe. So in your 
case no persisting/caching is required. This is regarding to “when”.

The “where” usually belongs to the closest point of reusing 
calculations/transformations

Btw, I’m not sure if caching is useful when you have a HUGE dataframe. Maybe 
persisting will be more useful

Best regards

> On 21 Apr 2022, at 16:24, Sean Owen  wrote:
> 
> 
> You persist before actions, not after, if you want the action's outputs to be 
> persistent.
> If anything swap line 2 and 3. However, there's no point in the count() here, 
> and because there is already only one action following to write, no caching 
> is useful in that example.
> 
>> On Thu, Apr 21, 2022 at 2:26 AM Sid  wrote:
>> Hi Folks,
>> 
>> I am working on Spark Dataframe API where I am doing following thing:
>> 
>> 1) df = spark.sql("some sql on huge dataset").persist()
>> 2) df1 = df.count()
>> 3) df.repartition().write.mode().parquet("")
>> 
>> 
>> AFAIK, persist should be used after count statement if at all it is needed 
>> to be used since spark is lazily evaluated and if I call any action it will 
>> recompute the above code and hence no use of persisting it before action. 
>> 
>> Therefore, it should be something like the below that should give better 
>> performance.
>> 1) df= spark.sql("some sql on huge dataset")
>> 2) df1 = df.count()
>> 3) df.persist()
>> 4) df.repartition().write.mode().parquet("")
>> 
>> So please help me to understand how it should be exactly and why? If I am 
>> not correct
>> 
>> Thanks,
>> Sid
>> 


Unsubscribe

2021-09-08 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Unsubscribe 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2021-09-03 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Unsubscribe

Unsubscribe

2021-09-03 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Bechmarks on Spark running on Yarn versus Spark on K8s

2021-07-05 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Not a big expert on Spark, but I’m not really understand how you are going to 
compare and what? Reading-writing to and from Hdfs? How does it related to yarn 
and k8s… these are recourse managers (YARN yet another resource manager) : what 
and how much to allocate and when… (cpu, ram).
Local Disk spilling? Depends on disk throughput…
So what you are going to measure?




Best regards

> On 5 Jul 2021, at 20:43, Mich Talebzadeh  wrote:
> 
> 
> 
> I was curious to know if there are benchmarks around on comparison between 
> Spark on Yarn compared to Kubernetes.
> 
> This question arose because traditionally in Google Cloud we have been using 
> Spark on Dataproc clusters. Dataproc  provides Spark, Hadoop plus others 
> (optional install) for data and analytic processing. It is PaaS
> 
> Now they have GKE clusters as well and also introduced Apache Spark with 
> Cloud Dataproc on Kubernetes which allows one to submit Spark jobs to k8s 
> using Dataproc stub as a platform to submit the job as below from cloud 
> console or local
> 
> gcloud dataproc jobs submit pyspark --cluster="dataproc-for-gke" 
> gs://bucket/testme.py --region="europe-west2" --py-files gs://bucket/DSBQ.zip
> Job [e5fc19b62cf744f0b13f3e6d9cc66c19] submitted.
> Waiting for job output...
> 
> At the moment it is a struggle to see what merits using k8s instead of 
> dataproc bar notebooks etc. Actually there is not much literature around with 
> PySpark on k8s.
> 
> For me Spark on bare metal is the preferred option as I cannot see how one 
> can pigeon hole Spark into a container and make it performant but I may be 
> totally wrong. 
> 
> Thanks
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Re: Stream which needs to be “joined” with another Stream of “Reference” data.

2021-05-03 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Always nice to learn something new about jdbc.
Thanks, Mich **thumbsup**


> On 3 May 2021, at 20:54, Mich Talebzadeh  wrote:
> 
> 
> i would have assumed that reference data like device_id are pretty static so 
> a snapshot will do.
> 
> JDBC connection is lazy so it will not materialise until the join uses it. 
> Then data will be collected from the underlying RDBMS table for COMMITED 
> transactions
> 
> However, this is something that I discussed in another thread
> 
> Spark Streaming with Files
> 
> There is an option that one can trigger once 
> 
>   result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  foreachBatch(sendToSink). \
>  queryName('trailFiles'). \
>  trigger(once = True). \
>  option('checkpointLocation', checkpoint_path). \
>  start(data_path)
> 
> This means that the streaming job will run for all data connected and 
> terminate. In that case JDBC connection will be refreshed according to your 
> batch interval that restarts the streaming process for unprocessed data and 
> critically your JDBC snapshot will be updated as read
> 
> This can be done through airflow etc. You won't lose data as the checkpoint 
> will mark processed records.
> 
> That might be an option.
> 
> HTH
> 
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ 
>  wrote:‬
>> You can do the enrichment with stream(events)-static(device table) join when 
>> the device table is slow changing dimension (let’s say once a day change) 
>> and it’s in delta format, then for every micro batch with stream-static John 
>> the device table will be rescanned and up to date device data will be loaded.
>> 
>> If device table is not slow dimension(once an hour change), then you’d 
>> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in 
>> Spark supports streaming mode.
>> So I’d better sync jdbc with parquet/delta periodically in order to emulate 
>> streaming source
>> 
>> 
>>>> On 3 May 2021, at 20:02, Eric Beabes  wrote:
>>>> 
>>> 
>>> 1) Device_id might be different for messages in a batch.
>>> 2) It's a Streaming application. The IOT messages are getting read in a 
>>> Structured Streaming job in a "Stream". The Dataframe would need to be 
>>> updated every hour. Have you done something similar in the past? Do you 
>>> have an example to share?
>>> 
>>>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh  
>>>> wrote:
>>>> Can you please clarify:
>>>> 
>>>> The IOT messages in one batch have the same device_id or every row has 
>>>> different device_id?
>>>> The RDBMS table can be read through JDBC in Spark and a dataframe can be 
>>>> created on. Does that work for you? You do not really need to stream the 
>>>> reference table. 
>>>> 
>>>> HTH
>>>> 
>>>>  
>>>> 
>>>>view my Linkedin profile
>>>> 
>>>>  
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>> loss, damage or destruction of data or any other property which may arise 
>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>> The author will in no case be liable for any monetary damages arising from 
>>>> such loss, damage or destruction.
>>>>  
>>>> 
>>>> 
>>>>> On Mon, 3 May 2021 at 17:37, Eric Beabes  wrote:
>>>>> I would like to develop a Spark Structured Streaming job that reads 
>>>>> mes

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

2021-05-03 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
You can do the enrichment with stream(events)-static(device table) join when 
the device table is slow changing dimension (let’s say once a day change) and 
it’s in delta format, then for every micro batch with stream-static John the 
device table will be rescanned and up to date device data will be loaded.

If device table is not slow dimension(once an hour change), then you’d probably 
need stream-stream join but I’m not sure if RDBMS (aka jdbc) in Spark supports 
streaming mode.
So I’d better sync jdbc with parquet/delta periodically in order to emulate 
streaming source


> On 3 May 2021, at 20:02, Eric Beabes  wrote:
> 
> 
> 1) Device_id might be different for messages in a batch.
> 2) It's a Streaming application. The IOT messages are getting read in a 
> Structured Streaming job in a "Stream". The Dataframe would need to be 
> updated every hour. Have you done something similar in the past? Do you have 
> an example to share?
> 
>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh  
>> wrote:
>> Can you please clarify:
>> 
>> The IOT messages in one batch have the same device_id or every row has 
>> different device_id?
>> The RDBMS table can be read through JDBC in Spark and a dataframe can be 
>> created on. Does that work for you? You do not really need to stream the 
>> reference table. 
>> 
>> HTH
>> 
>>  
>> 
>>view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>>> On Mon, 3 May 2021 at 17:37, Eric Beabes  wrote:
>>> I would like to develop a Spark Structured Streaming job that reads 
>>> messages in a Stream which needs to be “joined” with another Stream of 
>>> “Reference” data.
>>> 
>>> For example, let’s say I’m reading messages from Kafka coming in from (lots 
>>> of) IOT devices. This message has a ‘device_id’. We have a DEVICE table on 
>>> a relational database. What I need to do is “join” the ‘device_id’ in the 
>>> message with the ‘device_id’ on the table to enrich the incoming message. 
>>> Somewhere I read that, this can be done by joining two streams. I guess, we 
>>> can create a “Stream” that reads the DEVICE table once every hour or so. 
>>> 
>>> Questions:
>>> 1) Is this the right way to solve this use case? 
>>> 2) Should we use a Stateful Stream for reading DEVICE table with State 
>>> timeout set to an hour?
>>> 3) What would happen while the DEVICE state is getting updated from the 
>>> table on the relational database?
>>> 
>>> Guidance would be greatly appreciated. Thanks.


Re: Is it enable to use Multiple UGIs in One Spark Context?

2021-03-25 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Assuming that all tables have same schema, you can make entire global
table partitioned by some column. Then apply specific UGOs permissions/ACLs per 
partition subdirectory


> On 25 Mar 2021, at 15:13, Kwangsun Noh  wrote:
> 
> 
> Hi, Spark users.
> 
> Currently I have to make multiple tables in hdfs using spark api.
> The tables need to made by each other users.
> 
> For example, table01 is owned by user01, table02 is owned by user02 like 
> below.
> 
> path | owner:group   | permission
> /data/table01/  | user01:spark   |770
> /data/table01/_SUCESS | user01:spark   |770  
> /data/table01/part_x | user01:spark   |770  
> /data/table01/part_x | user01:spark   |770
> ...
> /data/table02/  | user02:spark   |770
> ...
> /data/table03/  | user03:spark   |770
> ...
> 
> 
> 
> Actually I used the UGI to make them. but the directories was made as i 
> expect.
> But the files (part_x) was made by the user that launched the spark 
> application.
> 
> Is it possible to do what i want ?


Re: Rdd - zip with index

2021-03-23 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
I’m not Spark core developer and do not want to confuse you but it seems 
logical to me that just reading from single file (no matter what format of the 
file is used) gives no parallelism unless you do repartition by some column 
just after csv load, but the if you’re telling you’ve already tried repartition 
with no luck... 


> On 24 Mar 2021, at 03:47, KhajaAsmath Mohammed  
> wrote:
> 
> So spark by default doesn’t split the large 10gb file when loaded? 
> 
> Sent from my iPhone
> 
>> On Mar 23, 2021, at 8:44 PM, Yuri Oleynikov (‫יורי אולייניקוב‬‎) 
>>  wrote:
>> 
>> Hi, Mohammed 
>> I think that the reason that only one executor is running and have single 
>> partition is because you have single file that might be read/loaded into 
>> memory.
>> 
>> In order to achieve better parallelism I’d suggest to split the csv file.
>> 
>> Another problem is question: why are you using rdd?
>> Just Spark.read.option(“header”, 
>> true).load()..select().write.format(“avro”).save(...)
>> 
>> 
>>>> On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed  
>>>> wrote:
>>> 
>>> Hi,
>>> 
>>> I have 10gb file that should be loaded into spark dataframe. This file is 
>>> csv with header and we were using rdd.zipwithindex to get column names and 
>>> convert to avro accordingly. 
>>> 
>>> I am assuming this is taking long time and only executor runs and never 
>>> achieves parallelism. Is there a easy way to achieve parallelism after 
>>> filtering out the header. 
>>> 
>>> I am
>>> Also interested in solution that can remove header from the file and I can 
>>> give my own schema. This way I can split the files.
>>> 
>>> Rdd.partitions is always 1 for this even after repartitioning the dataframe 
>>> after zip with index . Any help on this topic please .
>>> 
>>> Thanks,
>>> Asmath
>>> 
>>> Sent from my iPhone
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Rdd - zip with index

2021-03-23 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Hi, Mohammed 
I think that the reason that only one executor is running and have single 
partition is because you have single file that might be read/loaded into memory.

In order to achieve better parallelism I’d suggest to split the csv file.

Another problem is question: why are you using rdd?
Just Spark.read.option(“header”, 
true).load()..select().write.format(“avro”).save(...)


> On 24 Mar 2021, at 03:19, KhajaAsmath Mohammed  
> wrote:
> 
> Hi,
> 
> I have 10gb file that should be loaded into spark dataframe. This file is csv 
> with header and we were using rdd.zipwithindex to get column names and 
> convert to avro accordingly. 
> 
> I am assuming this is taking long time and only executor runs and never 
> achieves parallelism. Is there a easy way to achieve parallelism after 
> filtering out the header. 
> 
> I am
> Also interested in solution that can remove header from the file and I can 
> give my own schema. This way I can split the files.
> 
> Rdd.partitions is always 1 for this even after repartitioning the dataframe 
> after zip with index . Any help on this topic please .
> 
> Thanks,
> Asmath
> 
> Sent from my iPhone
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: configuring .sparkStaging with group rwx

2021-02-25 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Spark-submit --conf spark.hadoop.fs.permissions.umask-mode=007
You may also set sticky bit on staging dir 

Sent from my iPhone

> On 26 Feb 2021, at 03:29, Bulldog20630405  wrote:
> 
> 
> 
> we have a spark cluster running on with multiple users...
> when running with the user owning the cluster jobs run fine... however, when 
> trying to run pyspark with a different user it fails because the 
> .sparkStaging/application_* is written with 700 so the user cannot write to 
> that directory
> 
> how to configure spark/yarn cluster so .sparkStaging is written as 770 
> instead of 700 so the users with shared group can execute?
> 
> 


Re: Dynamic Spark metrics creation

2021-01-17 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Hey Jacek, I’ll clarify myself a bit:
As bottom line I need following metrics being reported by structured streaming:
Country-USA:7
Country-Poland: 23
Country-Brazil: 56

The country names are included in incoming events and unknown at very 
beginning/application startup.

Thus registering accumulator and binding it to metric source at driver side on 
application startup is impossible (unless you register with all possible 
country names - which is waste of Spark memory, polluting metrics namespace 
with 99% of metrics having zero value, and wasting the network bandwidth ).


Отправлено с iPhone

> 17 янв. 2021 г., в 15:56, Jacek Laskowski  написал(а):
> 
> 
> Hey Yurii,
> 
> > which is unavailable from executors.
> 
> Register it on the driver and use accumulators on executors to update the 
> values (on the driver)?
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books
> Follow me on https://twitter.com/jaceklaskowski
> 
> 
> 
> ‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ 
>  wrote:‬
>> Hi all, 
>> I have a spark application with Arbitrary Stateful Aggregation implemented 
>> with FlatMapGroupsWithStateFunction.
>> 
>> I want to make some statistics about incoming events inside 
>> FlatMapGroupsWithStateFunction.
>> The statistics are made from some event property which on the one hand has 
>> dynamic values but on the other hand - small finite set (thought unknown) of 
>> values (e.g. country name).
>> 
>> So I thought to register dynamic metrics inside  
>> FlatMapGroupsWithStateFunction but as far as I understand, this requires 
>> accessing MetricsSystem via SparkEnv.get() which is unavailable from 
>> executors.
>> 
>> Any thoughts/suggestions? 
>> 
>> With best regards,
>> Yurii
>> 


Re: Caching

2020-12-07 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
You are using same csv twice?

Отправлено с iPhone

> 7 дек. 2020 г., в 18:32, Amit Sharma  написал(а):
> 
> 
> Hi All, I am using caching in my code. I have a DF like
> val  DF1 = read csv.
> val DF2 = DF1.groupBy().agg().select(.)
> 
> Val DF3 =  read csv .join(DF1).join(DF2)
>   DF3 .save.
> 
> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1 
> action only why do I need to cache.
> 
> Thanks
> Amit
> 
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured streaming - Kakfa - slowness with query 0

2020-10-21 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
I think MaxOffsetsPerTrigger in Spark + Kafka integration docs would meet your 
requirement

Отправлено с iPhone

> 21 окт. 2020 г., в 12:36, KhajaAsmath Mohammed  
> написал(а):
> 
> Thanks. Do we have option to limit number of records ? Like process only 
> 1 or the property we pass ? This way we can handle the amount of the data 
> for batches that we need . 
> 
> Sent from my iPhone
> 
>>> On Oct 21, 2020, at 12:11 AM, lec ssmi  wrote:
>>> 
>> 
>> Structured streaming's  bottom layer also uses a micro-batch mechanism. 
>> It seems that the first batch is slower than  the latter, I also often 
>> encounter this problem. It feels related to the division of batches. 
>>Other the other hand, spark's batch size is usually bigger than flume 
>> transaction bache size. 
>> 
>> 
>> KhajaAsmath Mohammed  于2020年10月21日周三 下午12:19写道:
>>> Yes. Changing back to latest worked but I still see the slowness compared 
>>> to flume. 
>>> 
>>> Sent from my iPhone
>>> 
> On Oct 20, 2020, at 10:21 PM, lec ssmi  wrote:
> 
 
 Do you start your application  with  chasing the early Kafka data  ? 
 
 Lalwani, Jayesh  于2020年10月21日周三 上午2:19写道:
> Are you getting any output? Streaming jobs typically run forever, and 
> keep processing data as it comes in the input. If a streaming job is 
> working well, it will typically generate output at a certain cadence
> 
>  
> 
> From: KhajaAsmath Mohammed 
> Date: Tuesday, October 20, 2020 at 1:23 PM
> To: "user @spark" 
> Subject: [EXTERNAL] Spark Structured streaming - Kakfa - slowness with 
> query 0
> 
>  
> 
> CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and 
> know the content is safe.
> 
>  
> 
> Hi,
> 
>  
> 
> I have started using spark structured streaming for reading data from 
> kaka and the job is very slow. Number of output rows keeps increasing in 
> query 0 and the job is running forever. any suggestions for this please? 
> 
>  
> 
> 
>  
> 
> Thanks,
> 
> Asmath


Re: Scala vs Python for ETL with Spark

2020-10-17 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
It seems that thread converted to holy war that has nothing to do with original 
question. If it is, it’s super disappointing

Отправлено с iPhone

> 17 окт. 2020 г., в 15:53, Molotch  написал(а):
> 
> I would say the pros and cons of Python vs Scala is both down to Spark, the
> languages in themselves and what kind of data engineer you will get when you
> try to hire for the different solutions. 
> 
> With Pyspark you get less functionality and increased complexity with the
> py4j java interop compared to vanilla Spark. Why would you want that? Maybe
> you want the Python ML tools and have a clear use case, then go for it. If
> not, avoid the increased complexity and reduced functionality of Pyspark.
> 
> Python vs Scala? Idiomatic Python is a lesson in bad programming
> habits/ideas, there's no other way to put it. Do you really want programmers
> enjoying coding i such a language hacking away at your system?
> 
> Scala might be far from perfect with the plethora of ways to express
> yourself. But Python < 3.5 is not fit for anything except simple scripting
> IMO.
> 
> Doing exploratory data analysis in a Jupiter notebook, Pyspark seems like a
> fine idea. Coding an entire ETL library including state management, the
> whole kitchen including the sink, Scala everyday of the week.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Hive on Spark in Kubernetes.

2020-10-07 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Thank you very much!


Отправлено с iPhone

> 7 окт. 2020 г., в 17:38, mykidong  написал(а):
> 
> Hi all,
> 
> I have recently written a blog about hive on spark in kubernetes
> environment:
> - https://itnext.io/hive-on-spark-in-kubernetes-115c8e9fa5c1
> 
> In this blog, you can find how to run hive on kubernetes using spark thrift
> server compatible with hive server2.
> 
> Cheers,
> 
> - Kidong.
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Arbitrary stateful aggregation: updating state without setting timeout

2020-10-06 Thread Yuri Oleynikov (‫יורי אולייניקוב‬‎)
Hi Jungtaek 
Thank you very much for clarification

> 5 окт. 2020 г., в 15:17, Jungtaek Lim  
> написал(а):
> 
> 
> Hi,
> 
> That's not explained in the SS guide doc but explained in the scala API doc.
> http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/GroupState.html
> 
> The statement being quoted from the scala API doc answers your question.
> 
>> The timeout is reset every time the function is called on a group, that is, 
>> when the group has new data, or the group has timed out. So the user has to 
>> set the timeout duration every time the function is called, otherwise there 
>> will not be any timeout set.
> 
> Simply saying, you'd want to always set timeout unless you remove state for 
> the group (key).
> 
> Hope this helps.
> 
> Thanks,
> Jungtaek Lim (HeartSaVioR)
> 
> ‪On Mon, Oct 5, 2020 at 6:16 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ 
>  wrote:‬
>> Hi all, I have following question:
>> What happens to the state (in terms of expiration) if I’m updating the state 
>> without setting timeout? 
>> 
>> E.g. in FlatMapGroupsWithStateFunction
>> first batch:
>> state.update(myObj)
>> state.setTimeoutDuration(timeout)
>> second batch:
>> state.update(myObj)
>> third batch (no data for a long time):
>>  state timed-out after initial timeout  expired? Not timed-out?