Re: How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-15 Thread Felix Kizhakkel Jose
Thank you so much for the insights.
@Mich Talebzadeh  Really appreciate your
detailed examples.
@Jungtaek Lim I see your point. I am thinking of having a mapping table
with UUID to incremental ID and leverage range pruning etc on a large
dataset.
@sebastian I have to check how to do something like snowflake id. Do you
have any examples or directions?

Let me ask you another way, how are you handling the non incrementing
UUIDs? Because Parquet - range stats has min and max, but if your id is a
UUID, this doesn't help to decide whether the value that you search is
present in the files until you scan the entire file, because min-max on
uuid doesn't work greatly.

Please share your experiences or ideas on how you handled this situation.

Regards,
Felix K Jose

On Tue, Jul 13, 2021 at 7:59 PM Jungtaek Lim 
wrote:

> Theoretically, the composed value of batchId +
> monotonically_increasing_id() would achieve the goal. The major downside is
> that you'll need to deal with "deduplication" of output based on batchID
> as monotonically_increasing_id() is indeterministic. You need to ensure
> there's NO overlap on output against multiple reattempts for the same batch
> ID.
>
> Btw, even just assume you dealt with auto increasing ID on write, how do
> you read files and apply range pruning by auto increasing ID? Is the
> approach scalable and efficient? You probably couldn't avoid reading
> unnecessary files unless you build an explicit metadata regarding files
> like the map file name to the range of ID and also craft a custom reader to
> leverage the information.
>
>
> On Wed, Jul 14, 2021 at 6:00 AM Sebastian Piu 
> wrote:
>
>> If you want them to survive across jobs you can use snowflake IDs or
>> similar ideas depending on your use case
>>
>> On Tue, 13 Jul 2021, 9:33 pm Mich Talebzadeh, 
>> wrote:
>>
>>> Meaning as a monolithically incrementing ID as in Oracle sequence for
>>> each record read from Kafka. adding that to your dataframe?
>>>
>>> If you do Structured Structured Streaming in microbatch mode, you will
>>> get what is known as BatchId
>>>
>>>result = streamingDataFrame.select( \
>>>  col("parsed_value.rowkey").alias("rowkey") \
>>>, col("parsed_value.ticker").alias("ticker") \
>>>, col("parsed_value.timeissued").alias("timeissued") \
>>>, col("parsed_value.price").alias("price")). \
>>>  writeStream. \
>>>  outputMode('append'). \
>>>  option("truncate", "false"). \
>>>  *foreachBatch(sendToSink). \*
>>>  trigger(processingTime='30 seconds'). \
>>>  option('checkpointLocation', checkpoint_path). \
>>>  queryName(config['MDVariables']['topic']). \
>>>
>>> That function sendToSink will introduce two variables df and batchId
>>>
>>> def *sendToSink(df, batchId):*
>>> if(len(df.take(1))) > 0:
>>> print(f"""md batchId is {batchId}""")
>>> df.show(100,False)
>>> df. persist()
>>> # write to BigQuery batch table
>>> s.writeTableToBQ(df, "append",
>>> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>>> df.unpersist()
>>> print(f"""wrote to DB""")
>>> else:
>>> print("DataFrame md is empty")
>>>
>>> That value batchId can be used for each Batch.
>>>
>>>
>>> Otherwise you can do this
>>>
>>>
>>> startval = 1
>>> df = df.withColumn('id', monotonicallyIncreasingId + startval)
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 13 Jul 2021 at 19:53, Felix Kizhakkel Jose <
>>> felixkizhakkelj...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am using Spark Structured Streaming to sink data from Kafka to AWS
>>>> S3. I am wondering if its possible for me to introduce a uniquely
>>>> incrementing identifier for each record as we do in RDBMS (incrementing
>>>> long id)?
>>>> This would greatly benefit to range prune while reading based on this
>>>> ID.
>>>>
>>>> Any thoughts? I have looked at monotonically_incrementing_id but seems
>>>> like its not deterministic and it wont ensure new records gets next id from
>>>> the latest id what  is already present in the storage (S3)
>>>>
>>>> Regards,
>>>> Felix K Jose
>>>>
>>>


How to generate unique incrementing identifier in a structured streaming dataframe

2021-07-13 Thread Felix Kizhakkel Jose
Hello,

I am using Spark Structured Streaming to sink data from Kafka to AWS S3. I
am wondering if its possible for me to introduce a uniquely incrementing
identifier for each record as we do in RDBMS (incrementing long id)?
This would greatly benefit to range prune while reading based on this ID.

Any thoughts? I have looked at monotonically_incrementing_id but seems like
its not deterministic and it wont ensure new records gets next id from the
latest id what  is already present in the storage (S3)

Regards,
Felix K Jose


In built Optimizer on Spark

2021-03-21 Thread Felix Kizhakkel Jose
Hello,

Is there any in-built optimizer in Spark as in Flink, to avoid manual
configuration tuning to achieve better performance of your
structured streaming pipeline?
Or is there any work happening to achieve this?

Regards,
Felix K Jose


Re: How to modify a field in a nested struct using pyspark

2021-01-29 Thread Felix Kizhakkel Jose
Thank you so much for the quick response and great help.

@jeff, I will use the library if the 3.1 release is getting delayed. Thank
you so much.

On Fri, Jan 29, 2021 at 1:23 PM Jeff Evans 
wrote:

> If you need to do this in 2.x, this library does the trick:
> https://github.com/fqaiser94/mse
>
> On Fri, Jan 29, 2021 at 12:15 PM Adam Binford  wrote:
>
>> I think they're voting on the next release candidate starting sometime
>> next week. So hopefully barring any other major hurdles within the next few
>> weeks.
>>
>> On Fri, Jan 29, 2021, 1:01 PM Felix Kizhakkel Jose <
>> felixkizhakkelj...@gmail.com> wrote:
>>
>>> Wow, that's really great to know. Thank you so much Adam. Do you know
>>> when the 3.1 release is scheduled?
>>>
>>> Regards,
>>> Felix K Jose
>>>
>>> On Fri, Jan 29, 2021 at 12:35 PM Adam Binford  wrote:
>>>
>>>> As of 3.0, the only way to do it is something that will recreate the
>>>> whole struct:
>>>> df.withColumn('timingPeriod',
>>>> f.struct(f.col('timingPeriod.start').cast('timestamp').alias('start'),
>>>> f.col('timingPeriod.end').cast('timestamp').alias('end')))
>>>>
>>>> There's a new method coming in 3.1 on the column class called withField
>>>> which was designed for this purpose. I backported it to my personal 3.0
>>>> build because of how useful it is. It works something like:
>>>> df.withColumn('timingPeriod', f.col('timingPeriod').withField('start',
>>>> f.col('timingPeriod.start').cast('timestamp')).withField('end',
>>>> f.col('timingPeriod.end')))
>>>>
>>>> And it works on multiple levels of nesting which is nice.
>>>>
>>>> On Fri, Jan 29, 2021 at 11:32 AM Felix Kizhakkel Jose <
>>>> felixkizhakkelj...@gmail.com> wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am using pyspark structured streaming and I am getting timestamp
>>>>> fields as plain long (milliseconds), so I have to modify these fields into
>>>>> a timestamp type
>>>>>
>>>>> a sample json object object:
>>>>>
>>>>> {
>>>>>   "id":{
>>>>>   "value": "f40b2e22-4003-4d90-afd3-557bc013b05e",
>>>>>   "type": "UUID",
>>>>>   "system": "Test"
>>>>> },
>>>>>   "status": "Active",
>>>>>   "timingPeriod": {
>>>>> "startDateTime": 1611859271516,
>>>>> "endDateTime": null
>>>>>   },
>>>>>   "eventDateTime": 1611859272122,
>>>>>   "isPrimary": true,
>>>>> }
>>>>>
>>>>>   Here I want to convert "eventDateTime" and "startDateTime" and
>>>>> "endDateTime" as timestamp types
>>>>>
>>>>> So I have done following,
>>>>>
>>>>> def transform_date_col(date_col):
>>>>> return f.when(f.col(date_col).isNotNull(), f.col(date_col) / 1000)
>>>>>
>>>>> df.withColumn(
>>>>> "eventDateTime", 
>>>>> transform_date_col("eventDateTime").cast("timestamp")).withColumn(
>>>>> "timingPeriod.start", 
>>>>> transform_date_col("timingPeriod.start").cast("timestamp")).withColumn(
>>>>> "timingPeriod.end", 
>>>>> transform_date_col("timingPeriod.end").cast("timestamp"))
>>>>>
>>>>> the timingPeriod fields are not a struct anymore rather they become
>>>>> two different fields with names "timingPeriod.start", "timingPeriod.end".
>>>>>
>>>>> How can I get them as a struct as before?
>>>>> Is there a generic way I can modify a single/multiple properties of
>>>>> nested structs?
>>>>>
>>>>> I have hundreds of entities where the long needs to convert to
>>>>> timestamp, so a generic implementation will help my data ingestion 
>>>>> pipeline
>>>>> a lot.
>>>>>
>>>>> Regards,
>>>>> Felix K Jose
>>>>>
>>>>>
>>>>
>>>> --
>>>> Adam Binford
>>>>
>>>


Re: How to modify a field in a nested struct using pyspark

2021-01-29 Thread Felix Kizhakkel Jose
Wow, that's really great to know. Thank you so much Adam. Do you know when
the 3.1 release is scheduled?

Regards,
Felix K Jose

On Fri, Jan 29, 2021 at 12:35 PM Adam Binford  wrote:

> As of 3.0, the only way to do it is something that will recreate the whole
> struct:
> df.withColumn('timingPeriod',
> f.struct(f.col('timingPeriod.start').cast('timestamp').alias('start'),
> f.col('timingPeriod.end').cast('timestamp').alias('end')))
>
> There's a new method coming in 3.1 on the column class called withField
> which was designed for this purpose. I backported it to my personal 3.0
> build because of how useful it is. It works something like:
> df.withColumn('timingPeriod', f.col('timingPeriod').withField('start',
> f.col('timingPeriod.start').cast('timestamp')).withField('end',
> f.col('timingPeriod.end')))
>
> And it works on multiple levels of nesting which is nice.
>
> On Fri, Jan 29, 2021 at 11:32 AM Felix Kizhakkel Jose <
> felixkizhakkelj...@gmail.com> wrote:
>
>> Hello All,
>>
>> I am using pyspark structured streaming and I am getting timestamp fields
>> as plain long (milliseconds), so I have to modify these fields into a
>> timestamp type
>>
>> a sample json object object:
>>
>> {
>>   "id":{
>>   "value": "f40b2e22-4003-4d90-afd3-557bc013b05e",
>>   "type": "UUID",
>>   "system": "Test"
>> },
>>   "status": "Active",
>>   "timingPeriod": {
>> "startDateTime": 1611859271516,
>> "endDateTime": null
>>   },
>>   "eventDateTime": 1611859272122,
>>   "isPrimary": true,
>> }
>>
>>   Here I want to convert "eventDateTime" and "startDateTime" and
>> "endDateTime" as timestamp types
>>
>> So I have done following,
>>
>> def transform_date_col(date_col):
>> return f.when(f.col(date_col).isNotNull(), f.col(date_col) / 1000)
>>
>> df.withColumn(
>> "eventDateTime", 
>> transform_date_col("eventDateTime").cast("timestamp")).withColumn(
>> "timingPeriod.start", 
>> transform_date_col("timingPeriod.start").cast("timestamp")).withColumn(
>> "timingPeriod.end", 
>> transform_date_col("timingPeriod.end").cast("timestamp"))
>>
>> the timingPeriod fields are not a struct anymore rather they become two
>> different fields with names "timingPeriod.start", "timingPeriod.end".
>>
>> How can I get them as a struct as before?
>> Is there a generic way I can modify a single/multiple properties of
>> nested structs?
>>
>> I have hundreds of entities where the long needs to convert to timestamp,
>> so a generic implementation will help my data ingestion pipeline a lot.
>>
>> Regards,
>> Felix K Jose
>>
>>
>
> --
> Adam Binford
>


How to modify a field in a nested struct using pyspark

2021-01-29 Thread Felix Kizhakkel Jose
Hello All,

I am using pyspark structured streaming and I am getting timestamp fields
as plain long (milliseconds), so I have to modify these fields into a
timestamp type

a sample json object object:

{
  "id":{
  "value": "f40b2e22-4003-4d90-afd3-557bc013b05e",
  "type": "UUID",
  "system": "Test"
},
  "status": "Active",
  "timingPeriod": {
"startDateTime": 1611859271516,
"endDateTime": null
  },
  "eventDateTime": 1611859272122,
  "isPrimary": true,
}

  Here I want to convert "eventDateTime" and "startDateTime" and
"endDateTime" as timestamp types

So I have done following,

def transform_date_col(date_col):
return f.when(f.col(date_col).isNotNull(), f.col(date_col) / 1000)

df.withColumn(
"eventDateTime",
transform_date_col("eventDateTime").cast("timestamp")).withColumn(
"timingPeriod.start",
transform_date_col("timingPeriod.start").cast("timestamp")).withColumn(
"timingPeriod.end",
transform_date_col("timingPeriod.end").cast("timestamp"))

the timingPeriod fields are not a struct anymore rather they become two
different fields with names "timingPeriod.start", "timingPeriod.end".

How can I get them as a struct as before?
Is there a generic way I can modify a single/multiple properties of nested
structs?

I have hundreds of entities where the long needs to convert to timestamp,
so a generic implementation will help my data ingestion pipeline a lot.

Regards,
Felix K Jose


How to modify a field in a nested struct using pyspark

2021-01-29 Thread Felix Kizhakkel Jose
Hello All,

I am using pyspark structured streaming and I am getting timestamp fields
as plain long (milliseconds), so I have to modify these fields into a
timestamp type

a sample json object object:

{
  "id":{
  "value": "f40b2e22-4003-4d90-afd3-557bc013b05e",
  "type": "UUID",
  "system": "Test"
},
  "status": "Active",
  "timingPeriod": {
"startDateTime": 1611859271516,
"endDateTime": null
  },
  "eventDateTime": 1611859272122,
  "isPrimary": true,
}

  Here I want to convert "eventDateTime" and "startDateTime" and
"endDateTime" as timestamp types

So I have done following,

def transform_date_col(date_col):
return f.when(f.col(date_col).isNotNull(), f.col(date_col) / 1000)

df.withColumn(
"eventDateTime",
transform_date_col("eventDateTime").cast("timestamp")).withColumn(
"timingPeriod.start",
transform_date_col("timingPeriod.start").cast("timestamp")).withColumn(
"timingPeriod.end",
transform_date_col("timingPeriod.end").cast("timestamp"))

the timingPeriod fields are not a struct anymore rather they become two
different fields with names "timingPeriod.start", "timingPeriod.end".

How can I get them as a struct as before?
Is there a generic way I can modify a single/multiple properties of nested
structs?

I have hundreds of entities where the long needs to convert to timestamp,
so a generic implementation will help my data ingestion pipeline a lot.

Regards,
Felix K Jose