Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Jungtaek Lim
Theoretically, the composed value of batchId + monotonically_increasing_id() would achieve the goal. The major downside is that you'll need to deal with "deduplication" of output based on batchID as monotonically_increasing_id() is indeterministic. You need to ensure there's NO overlap on output

Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Mich Talebzadeh
Sorry a correction regarding creating incrementing ID in Pyspark >>> df = spark.range(1,5) >>> from pyspark.sql.window import Window as W >>> from pyspark.sql import functions as F >>> df = df.withColumn("idx", F.monotonically_increasing_id()) >>> Wspec = W.orderBy("idx") >>> df.withColumn("idx",

Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Sebastian Piu
If you want them to survive across jobs you can use snowflake IDs or similar ideas depending on your use case On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, wrote: > Meaning as a monolithically incrementing ID as in Oracle sequence for each > record read from Kafka. adding that to your

Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Mich Talebzadeh
Meaning as a monolithically incrementing ID as in Oracle sequence for each record read from Kafka. adding that to your dataframe? If you do Structured Structured Streaming in microbatch mode, you will get what is known as BatchId result = streamingDataFrame.select( \

How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Felix Kizhakkel Jose
Hello, I am using Spark Structured Streaming to sink data from Kafka to AWS S3. I am wondering if its possible for me to introduce a uniquely incrementing identifier for each record as we do in RDBMS (incrementing long id)? This would greatly benefit to range prune while reading based on this ID.

Question about query local dirs when fetching HostLocalBlocks

2021-07-13 Thread 徐涛
Hi Experts, When I`m reading spark code in version 3.0.0, when external shuffle service is enabled: ShuffleBlockFetcherIterator -> fetchHostLocalBlocks ( there is some logic, when there is no record in cache, then it need to use hostLocalDirManager.getHostLocalDirs to

Re: Unsubscribe

2021-07-13 Thread Howard Yang
Unsubscribe Eric Wang 于2021年7月12日周一 上午7:31写道: > Unsubscribe > > On Sun, Jul 11, 2021 at 9:59 PM Rishi Raj Tandon < > tandon.rishi...@gmail.com> wrote: > >> Unsubscribe >> >