Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Jungtaek Lim
Theoretically, the composed value of batchId +
monotonically_increasing_id() would achieve the goal. The major downside is
that you'll need to deal with "deduplication" of output based on batchID
as monotonically_increasing_id() is indeterministic. You need to ensure
there's NO overlap on output against multiple reattempts for the same batch
ID.

Btw, even just assume you dealt with auto increasing ID on write, how do
you read files and apply range pruning by auto increasing ID? Is the
approach scalable and efficient? You probably couldn't avoid reading
unnecessary files unless you build an explicit metadata regarding files
like the map file name to the range of ID and also craft a custom reader to
leverage the information.


On Wed, Jul 14, 2021 at 6:00 AM Sebastian Piu 
wrote:

> If you want them to survive across jobs you can use snowflake IDs or
> similar ideas depending on your use case
>
> On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, 
> wrote:
>
>> Meaning as a monolithically incrementing ID as in Oracle sequence for
>> each record read from Kafka. adding that to your dataframe?
>>
>> If you do Structured Structured Streaming in microbatch mode, you will
>> get what is known as BatchId
>>
>>result = streamingDataFrame.select( \
>>  col("parsed_value.rowkey").alias("rowkey") \
>>, col("parsed_value.ticker").alias("ticker") \
>>, col("parsed_value.timeissued").alias("timeissued") \
>>, col("parsed_value.price").alias("price")). \
>>  writeStream. \
>>  outputMode('append'). \
>>  option("truncate", "false"). \
>>  *foreachBatch(sendToSink). \*
>>  trigger(processingTime='30 seconds'). \
>>  option('checkpointLocation', checkpoint_path). \
>>  queryName(config['MDVariables']['topic']). \
>>
>> That function sendToSink will introduce two variables df and batchId
>>
>> def *sendToSink(df, batchId):*
>> if(len(df.take(1))) > 0:
>> print(f"""md batchId is {batchId}""")
>> df.show(100,False)
>> df. persist()
>> # write to BigQuery batch table
>> s.writeTableToBQ(df, "append",
>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>> df.unpersist()
>> print(f"""wrote to DB""")
>> else:
>> print("DataFrame md is empty")
>>
>> That value batchId can be used for each Batch.
>>
>>
>> Otherwise you can do this
>>
>>
>> startval = 1
>> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
>> felixkizhakkelj...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using Spark Structured Streaming to sink data from Kafka to AWS S3.
>>> I am wondering if its possible for me to introduce a uniquely incrementing
>>> identifier for each record as we do in RDBMS (incrementing long id)?
>>> This would greatly benefit to range prune while reading based on this ID.
>>>
>>> Any thoughts? I have looked at monotonically_incrementing_id but seems
>>> like its not deterministic and it wont ensure new records gets next id from
>>> the latest id what  is already present in the storage (S3)
>>>
>>> Regards,
>>> Felix K Jose
>>>
>>


Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Mich Talebzadeh
Sorry a correction regarding creating incrementing ID in Pyspark

>>> df = spark.range(1,5)
>>> from pyspark.sql.window import Window as W
>>> from pyspark.sql import functions as F
>>> df = df.withColumn("idx", F.monotonically_increasing_id())
>>> Wspec = W.orderBy("idx")
>>> df.withColumn("idx", F.row_number().over(windowSpec)).show()
2021-07-13 22:04:15,001 WARN window.WindowExec: No Partition Defined for
Window operation! Moving all data to a single partition, this can cause
serious performance degradation.
+---+---+
| id|idx|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
+---+---+

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Jul 2021 at 21:32, Mich Talebzadeh 
wrote:

> Meaning as a monolithically incrementing ID as in Oracle sequence for each
> record read from Kafka. adding that to your dataframe?
>
> If you do Structured Structured Streaming in microbatch mode, you will get
> what is known as BatchId
>
>result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  *foreachBatch(sendToSink). \*
>  trigger(processingTime='30 seconds'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName(config['MDVariables']['topic']). \
>
> That function sendToSink will introduce two variables df and batchId
>
> def *sendToSink(df, batchId):*
> if(len(df.take(1))) > 0:
> print(f"""md batchId is {batchId}""")
> df.show(100,False)
> df. persist()
> # write to BigQuery batch table
> s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
> df.unpersist()
> print(f"""wrote to DB""")
> else:
> print("DataFrame md is empty")
>
> That value batchId can be used for each Batch.
>
>
> Otherwise you can do this
>
>
> startval = 1
> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
> felixkizhakkelj...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using Spark Structured Streaming to sink data from Kafka to AWS S3.
>> I am wondering if its possible for me to introduce a uniquely incrementing
>> identifier for each record as we do in RDBMS (incrementing long id)?
>> This would greatly benefit to range prune while reading based on this ID.
>>
>> Any thoughts? I have looked at monotonically_incrementing_id but seems
>> like its not deterministic and it wont ensure new records gets next id from
>> the latest id what  is already present in the storage (S3)
>>
>> Regards,
>> Felix K Jose
>>
>


Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Sebastian Piu
If you want them to survive across jobs you can use snowflake IDs or
similar ideas depending on your use case

On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, 
wrote:

> Meaning as a monolithically incrementing ID as in Oracle sequence for each
> record read from Kafka. adding that to your dataframe?
>
> If you do Structured Structured Streaming in microbatch mode, you will get
> what is known as BatchId
>
>result = streamingDataFrame.select( \
>  col("parsed_value.rowkey").alias("rowkey") \
>, col("parsed_value.ticker").alias("ticker") \
>, col("parsed_value.timeissued").alias("timeissued") \
>, col("parsed_value.price").alias("price")). \
>  writeStream. \
>  outputMode('append'). \
>  option("truncate", "false"). \
>  *foreachBatch(sendToSink). \*
>  trigger(processingTime='30 seconds'). \
>  option('checkpointLocation', checkpoint_path). \
>  queryName(config['MDVariables']['topic']). \
>
> That function sendToSink will introduce two variables df and batchId
>
> def *sendToSink(df, batchId):*
> if(len(df.take(1))) > 0:
> print(f"""md batchId is {batchId}""")
> df.show(100,False)
> df. persist()
> # write to BigQuery batch table
> s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
> df.unpersist()
> print(f"""wrote to DB""")
> else:
> print("DataFrame md is empty")
>
> That value batchId can be used for each Batch.
>
>
> Otherwise you can do this
>
>
> startval = 1
> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
> felixkizhakkelj...@gmail.com> wrote:
>
>> Hello,
>>
>> I am using Spark Structured Streaming to sink data from Kafka to AWS S3.
>> I am wondering if its possible for me to introduce a uniquely incrementing
>> identifier for each record as we do in RDBMS (incrementing long id)?
>> This would greatly benefit to range prune while reading based on this ID.
>>
>> Any thoughts? I have looked at monotonically_incrementing_id but seems
>> like its not deterministic and it wont ensure new records gets next id from
>> the latest id what  is already present in the storage (S3)
>>
>> Regards,
>> Felix K Jose
>>
>


Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Mich Talebzadeh
Meaning as a monolithically incrementing ID as in Oracle sequence for each
record read from Kafka. adding that to your dataframe?

If you do Structured Structured Streaming in microbatch mode, you will get
what is known as BatchId

   result = streamingDataFrame.select( \
 col("parsed_value.rowkey").alias("rowkey") \
   , col("parsed_value.ticker").alias("ticker") \
   , col("parsed_value.timeissued").alias("timeissued") \
   , col("parsed_value.price").alias("price")). \
 writeStream. \
 outputMode('append'). \
 option("truncate", "false"). \
 *foreachBatch(sendToSink). \*
 trigger(processingTime='30 seconds'). \
 option('checkpointLocation', checkpoint_path). \
 queryName(config['MDVariables']['topic']). \

That function sendToSink will introduce two variables df and batchId

def *sendToSink(df, batchId):*
if(len(df.take(1))) > 0:
print(f"""md batchId is {batchId}""")
df.show(100,False)
df. persist()
# write to BigQuery batch table
s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
df.unpersist()
print(f"""wrote to DB""")
else:
print("DataFrame md is empty")

That value batchId can be used for each Batch.


Otherwise you can do this


startval = 1
df = df.withColumn('id', monotonicallyIncreasingId + startval)

HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
felixkizhakkelj...@gmail.com> wrote:

> Hello,
>
> I am using Spark Structured Streaming to sink data from Kafka to AWS S3. I
> am wondering if its possible for me to introduce a uniquely incrementing
> identifier for each record as we do in RDBMS (incrementing long id)?
> This would greatly benefit to range prune while reading based on this ID.
>
> Any thoughts? I have looked at monotonically_incrementing_id but seems
> like its not deterministic and it wont ensure new records gets next id from
> the latest id what  is already present in the storage (S3)
>
> Regards,
> Felix K Jose
>


How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Felix Kizhakkel Jose
Hello,

I am using Spark Structured Streaming to sink data from Kafka to AWS S3. I
am wondering if its possible for me to introduce a uniquely incrementing
identifier for each record as we do in RDBMS (incrementing long id)?
This would greatly benefit to range prune while reading based on this ID.

Any thoughts? I have looked at monotonically_incrementing_id but seems like
its not deterministic and it wont ensure new records gets next id from the
latest id what  is already present in the storage (S3)

Regards,
Felix K Jose


Question about query local dirs when fetching HostLocalBlocks

2021-07-13 Thread 徐涛
Hi Experts,
  When I`m reading spark code in version 3.0.0, when external shuffle 
service is enabled:
  ShuffleBlockFetcherIterator -> 
fetchHostLocalBlocks ( there is some logic, when there is no record 
in cache, then it need to use hostLocalDirManager.getHostLocalDirs to send 
message to external shuffle service) 


  My question is :
  1. As the executors are in the same host, the local dir should be sure to 
be same. Why it does so much job here( do some cache and RPC), just to fetch 
local dirs?
  2. Should there be a config to let user just do the same logic as 
LocalBlocks?


  Thanks a lot.


Best,
Tao

Re: Unsubscribe

2021-07-13 Thread Howard Yang
Unsubscribe

Eric Wang  于2021年7月12日周一 上午7:31写道:

> Unsubscribe
>
> On Sun, Jul 11, 2021 at 9:59 PM Rishi Raj Tandon <
> tandon.rishi...@gmail.com> wrote:
>
>> Unsubscribe
>>
>