Spark structured streaming with periodical persist and unpersist

2021-02-11 Thread act_coder
I am currently building a spark structured streaming application where I am
doing a batch-stream join. And the source for the batch data gets updated
periodically.

So, I am planning to do a persist/unpersist of that batch data periodically.

Below is a sample code which I am using to persist and unpersist the batch
data.

Flow: -> Read the batch data -> persist the batch data -> For every one
hour, unpersist the data and read the batch data and persist it again.

But, I am not seeing the batch data getting refreshed for every hour.

Code:

var batchDF = handler.readBatchDF(sparkSession)
batchDF.persist(StorageLevel.MEMORY_AND_DISK)
var refreshedTime: Instant = Instant.now()

if (Duration.between(refreshedTime, Instant.now()).getSeconds > refreshTime)
{
  refreshedTime = Instant.now()
  batchDF.unpersist(false)
  batchDF =  handler.readBatchDF(sparkSession)
.persist(StorageLevel.MEMORY_AND_DISK)
}
Is there any better way to achieve this scenario in spark structured
streaming jobs ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Find difference between two dataframes in spark structured streaming

2020-12-16 Thread act_coder
I am creating a spark structured streaming job, where I need to find the
difference between two dataframes.

Dataframe 1 :

[1, item1, value1]
[2, item2, value2]
[3, item3, value3]
[4, item4, value4]
[5, item5, value5]

Dataframe 2:

[4, item4, value4]
[5, item5, value5]

New Dataframe with difference between two D1-D2:

[1, item1, value1]
[2, item2, value2]
[3, item3, value3]

I tried using except() and left anti join(), but both are not being
supported on spark structured streaming.

Is there a way we can achieve this in structured streaming ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



converting dataframe from one format to another in spark structured streaming

2020-11-26 Thread act_coder
I am creating a spark structured streaming application and I have a streaming
dataframe which has the below data in it.

{
"name":"sensor1",
"time":"2020-11-27T01:01:00",
"sensorvalue":11.0,
"tag1":"tagvalue"
}
I would like to convert that dataframe into below format.

{
"name":"sensor1",
"value-array":
[
{
"time":"2020-11-27T01:01:00",
"sensorvalue":11.0,
"tag1":"tagvalue"
}
]
}
I tried using mapPartition() / map() method, where I get a row Object and I
tried creating another dataframe in the expected format.

I am able to get the values from row object using row(0) or row(1). But, Is
it possible to put a POJO/schema for that row object (Using Row Encoders())
? That way instead of using row(0), we can use row.getName() ?

val mapDF = incomingDF.map(row =>{
val name = row(0).toString
/* val name = row.getName */

  })
I tried using collect_list() function. Since its an aggregate function, I
couldn't use "append" as output mode in my streaming application.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



converting dataframe from one format to another in spark structured streaming

2020-11-26 Thread act_coder
I am creating a spark structured streaming application and I have a streaming
dataframe which has the below data in it.

{
"name":"sensor1",
"time":"2020-11-27T01:01:00",
"sensorvalue":11.0,
"tag1":"tagvalue"
}
I would like to convert that dataframe into below format.

{
"name":"sensor1",
"value-array":
[
{
"time":"2020-11-27T01:01:00",
"sensorvalue":11.0,
"tag1":"tagvalue"
}
]
}
I tried using mapPartition() / map() method, where I get a row Object and I
tried creating another dataframe in the expected format.

I am able to get the values from row object using row(0) or row(1). But, Is
it possible to put a POJO/schema for that row object (Using Row Encoders())
? That way instead of using row(0), we can use row.getName() ?

val mapDF = incomingDF.map(row =>{
val name = row(0).toString
/* val name = row.getName */

  })
I tried using collect_list() function. Since its an aggregate function, I
couldn't use "append" as output mode in my streaming application.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Single spark streaming job to read incoming events with dynamic schema

2020-11-16 Thread act_coder
I am trying to create a spark structured streaming job which reads from a
Kafka topic and the events coming from that Kafka topic will have different
schemas (There is no standard schema for the incoming events).

Sample incoming events:

event1: {timestamp:2018-09-28T15:50:57.2420418+00:00, value: 11}
event2: {timestamp:2018-09-28T15:50:57.2420418+00:00, value: 11,
location:abc}
event3: {order_id:1, ordervalue: 11}
How can I create a spark structured streaming to read the above events
without stopping the spark job for making any new schema changes ?

Also we may need to provide schema while using spark.readStream(). I thought
of reading a small subset of incoming data and derive the schema from it.
But, that might not work here as the incoming data is disparate and we may
have different schemas for each of the incoming events.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using two WriteStreams in same spark structured streaming job

2020-11-04 Thread act_coder
If I use for each function, then I may need to use custom Kafka stream writer
right ?! 

And I might not be able to use default writestream.format(Kafka) method ?!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using two WriteStreams in same spark structured streaming job

2020-11-04 Thread act_coder
I have a scenario where I would like to save the same streaming dataframe to
two different streaming sinks.

I have created a streaming dataframe which I need to send to both Kafka
topic and delta lake.

I thought of using forEachBatch, but looks like it doesn't support multiple
STREAMING SINKS.

Also, I tried using spark session.awaitAnyTermination() with multiple write
streams. But the second stream is not getting processed !

Is there a way through which we can achieve this ?!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to use groupByKey() in spark structured streaming without aggregates

2020-10-27 Thread act_coder
Is there a way through which we can use* groupByKey() Function in spark
structured streaming without aggregates ?*

I have a scenario like below, where we would like to group the items based
on a key without applying any aggregates.

*Sample incoming data:*



I would like to apply groupByKey on field - "device_id", so that i will be
getting an output like below.



I have also tried using collect_list() in the aggregate expression of
groupByKey, but that is taking more time to process the datasets.

Also, since we are aggregating - we could only use either 'Complete' or
'Update' in output modes, but 'Append' mode looks more suitable for our use
case.

I have also looked at the groupByKey(Num_Partitions) and reduceByKey()
functions available in Direct Dstream which gives results like in the form
of -> (String, Itreable[String,Int]) without doing any aggregates.

Is there something available similar to that in structured streaming ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to disable 'spark.security.credentials.${service}.enabled' in Structured streaming while connecting to a kafka cluster

2020-01-12 Thread act_coder
Hi Gabor,

Thanks for the reply.

Yes, I have created a JIRA here and have added the details over there -
https://issues.apache.org/jira/browse/SPARK-30495

Please let me know if that is the right place to create it.
If not, where should I be creating it ?

Also would like to know, whether it is mandatory to get a delegation token
from kafka cluster ?
Can we disable that functionality by setting
'spark.security.credentials.kafka.enabled' to false ?

Regards,
AC




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to disable 'spark.security.credentials.${service}.enabled' in Structured streaming while connecting to a kafka cluster

2020-01-12 Thread act_coder
Hi Gabor,

Thanks for the reply.

Yes, I have created a JIRA here and have added the details over there - 
https://issues.apache.org/jira/browse/SPARK-30495
  

Please let me know if that is the right place to create it. 
If not, where should I be creating it ?

Also would like to know, whether it is mandatory to get a delegation token
from kafka cluster ?
Can we disable that functionality by setting
'spark.security.credentials.kafka.enabled' to false ?

Regards,
AC






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to disable 'spark.security.credentials.${service}.enabled' in Structured streaming while connecting to a kafka cluster

2020-01-08 Thread act_coder
I am trying to read data from a secured Kafka cluster using spark structured
streaming. Also I am using the below library to read the data -
"spark-sql-kafka-0-10_2.12":"3.0.0-preview" since it has the feature to
specify our custom group id (instead of spark setting its own custom group
id)

Dependency used in code:

org.apache.spark
spark-sql-kafka-0-10_2.12
3.0.0-preview

I am getting the below error - even after specifying the required JAAS
configuration in spark options.

Caused by: java.lang.IllegalArgumentException: requirement failed:
Delegation token must exist for this connector. at
scala.Predef$.require(Predef.scala:281) at

org.apache.spark.kafka010.KafkaTokenUtil$.isConnectorUsingCurrentToken(KafkaTokenUtil.scala:299)
at
org.apache.spark.sql.kafka010.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:533)
at
org.apache.spark.sql.kafka010.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:275)


Following document specifies that we can disable the feature of obtaining
delegation token -
https://spark.apache.org/docs/3.0.0-preview/structured-streaming-kafka-integration.html

I tried setting this property spark.security.credentials.kafka.enabled to
false in spark config, but it is still failing with the same error.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org