Re: Extract value from streaming Dataframe to a variable

2020-01-21 Thread Nick Dawes
Thanks for your reply.

I'm using Spark 2.3.2. Looks like foreach operation is only supported for
Java and Scala. Is there any alternative for Python?

On Mon, Jan 20, 2020, 5:09 PM Jungtaek Lim 
wrote:

> Hi,
>
> you can try out foreachBatch to apply the batch query operation to the
> each output of micro-batch:
>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Jan 20, 2020 at 8:43 PM Nick Dawes  wrote:
>
>> Streaming experts, any clues how to achieve this?
>>
>> After extracting few variables, I need to run them through a REST API for
>> verification and decision making.
>>
>> Thanks for your help.
>>
>> Nick
>>
>> On Fri, Jan 17, 2020, 6:27 PM Nick Dawes  wrote:
>>
>>> I need to extract a value from a PySpark structured streaming Dataframe
>>> to a string variable to check something.
>>>
>>> I tried this code.
>>>
>>> agentName =
>>> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>>>
>>> This works on a non-streaming Dataframe only. In a streaming Dataframe,
>>> collect is not supported.
>>>
>>> Any workaround for this?
>>>
>>> Nick
>>>
>>>
>>>


Re: Extract value from streaming Dataframe to a variable

2020-01-20 Thread Nick Dawes
Streaming experts, any clues how to achieve this?

After extracting few variables, I need to run them through a REST API for
verification and decision making.

Thanks for your help.

Nick

On Fri, Jan 17, 2020, 6:27 PM Nick Dawes  wrote:

> I need to extract a value from a PySpark structured streaming Dataframe to
> a string variable to check something.
>
> I tried this code.
>
> agentName =
> kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]
>
> This works on a non-streaming Dataframe only. In a streaming Dataframe,
> collect is not supported.
>
> Any workaround for this?
>
> Nick
>
>
>


Extract value from streaming Dataframe to a variable

2020-01-17 Thread Nick Dawes
I need to extract a value from a PySpark structured streaming Dataframe to
a string variable to check something.

I tried this code.

agentName =
kinesisDF.select(kinesisDF.agentName.getItem(0).alias("agentName")).collect()[0][0]

This works on a non-streaming Dataframe only. In a streaming Dataframe,
collect is not supported.

Any workaround for this?

Nick


Re: Structured Streaming Dataframe Size

2019-08-28 Thread Nick Dawes
Thank you, TD. Couple of follow up questions please.

1) "It only keeps around the minimal intermediate state data"

How do you define "minimal" here? Is there a configuration property to
control the time or size of Streaming Dataframe?

2) I'm not writing anything out to any database or S3. My requirement is to
find out a count (real-time) in a 1 hour window. I would like to get this
count from a BI tool. So can register as a temp view and access from BI
tool?

I tried something like this In my Streaming application

AggStreamingDF.createOrReplaceGlobalTempView("streaming_table")

Then, In BI tool, I queried like this...

select * from streaming_table

Error:  Queries with streaming sources must be executed with
writeStream.start()

Any suggestions to make this work?

Thank you very much for your help!


On Tue, Aug 27, 2019, 6:42 PM Tathagata Das 
wrote:

>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts
>
> *Note that Structured Streaming does not materialize the entire table*.
>> It reads the latest available data from the streaming data source,
>> processes it incrementally to update the result, and then discards the
>> source data. It only keeps around the minimal intermediate *state* data
>> as required to update the result (e.g. intermediate counts in the earlier
>> example).
>>
>
>
> On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes  wrote:
>
>> I have a quick newbie question.
>>
>> Spark Structured Streaming creates an unbounded dataframe that keeps
>> appending rows to it.
>>
>> So what's the max size of data it can hold? What if the size becomes
>> bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So
>> will it write temp data on S3 or on local file system of the cluster?
>>
>> Nick
>>
>


Structured Streaming Dataframe Size

2019-08-27 Thread Nick Dawes
I have a quick newbie question.

Spark Structured Streaming creates an unbounded dataframe that keeps
appending rows to it.

So what's the max size of data it can hold? What if the size becomes bigger
than the JVM? Will it spill to disk? I'm using S3 as storage. So will it
write temp data on S3 or on local file system of the cluster?

Nick


Spark Structured Streaming XML content

2019-08-14 Thread Nick Dawes
I'm trying to analyze data using Kinesis source in PySpark Structured
Streaming on Databricks.

Ceeated a Dataframe as shown below.

kinDF = spark.readStream.format("kinesis").("streamName",
"test-stream-1").load()

Converted the data from base64 encoding as below.

df =  kinDF.withColumn("xml_data", expr("CAST(data as string)"))

Now, I need to extract few fields from df.xml_data column using xpath.

Can you please suggest any possible solution?

If I create a dataframe directly for these xml files as

xml_df =
spark.read.format("xml").options(rowTag='Consumers').load("s3a://bkt/xmldata")

I'm able to query using xpath

xml_df.select("Analytics.Amount1").show()

But, not sure how to do extract elements similarly on a Spark Streaming
dataframe where data is in text format.

Are there any xml functions to convert text data using schema? I saw an
example for json data using from_json.

Is it possible to use spark.read on a dataframe column?

I need to find aggregated "Amount1" for every 5 minutes window.

Thanks for your help.

Nick


Re: Spark Image resizing

2019-07-31 Thread Nick Dawes
Any other way of resizing the image before creating the DataFrame in Spark?
I know opencv does it. But I don't have opencv on my cluster. I have
Anaconda python packages installed on my cluster.

Any ideas will be appreciated.  Thank you!

On Tue, Jul 30, 2019, 4:17 PM Nick Dawes  wrote:

> Hi
>
> I'm new to spark image data source.
>
> After creating a dataframe using Spark's image data source, I would like
> to resize the images in PySpark.
>
> df = spark.read.format("image").load(imageDir)
>
> Can you please help me with this?
>
> Nick
>


Spark Image resizing

2019-07-30 Thread Nick Dawes
Hi

I'm new to spark image data source.

After creating a dataframe using Spark's image data source, I would like to
resize the images in PySpark.

df = spark.read.format("image").load(imageDir)

Can you please help me with this?

Nick


Spark on Kubernetes Authentication error

2019-06-06 Thread Nick Dawes
Hi there,

I'm trying to run Spark on EKS. Created an EKS cluster, added nodes and
then trying to submit a Spark job from an EC2 instance.

Ran following commands for access. kubectl create serviceaccount spark
kubectl create clusterrolebinding spark-role --clusterrole=admin
--serviceaccount=default:spark --namespace=default

spark-submit command used:

bin/spark-submit \ --master k8s://
https://XX.us-east-1.eks.amazonaws.com
 \
--deploy-mode cluster \ --name spark-pi \ --class
org.apache.spark.examples.SparkPi \ --conf spark.executor.instances=2 \
--conf spark.app.name=spark-pi \ --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf
spark.kubernetes.container.image=k8sspark:latest \ --conf
spark.kubernetes.authenticate.submission.caCertFile=ca.pem \
local:usr/spark-2.4.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.3.jar
10

log4j:WARN No appenders could be found for logger
(io.fabric8.kubernetes.client.Config). log4j:WARN Please initialize the
log4j system properly. log4j:WARN See
http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using
Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/06/06 16:03:50 WARN WatchConnectionManager: Executor didn't terminate in
time after shutdown in close(), killing it in:
io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager@5b43fbf6
Exception in thread "main"
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
POST at:
https://X.us-east-1.eks.amazonaws.com/api/v1/namespaces/default/pods
.
Message: pods is forbidden: User "system:anonymous" cannot create resource
"pods" in API group "" in the namespace "default". Received status:
Status(apiVersion=v1, code=403, details=StatusDetails(causes=[],
group=null, kind=pods, name=null, retryAfterSeconds=null, uid=null,
additionalProperties={}), kind=Status, message=pods is forbidden: User
"system:anonymous" cannot create resource "pods" in API group "" in the
namespace "default", metadata=ListMeta(_continue=null,
resourceVersion=null, selfLink=null, additionalProperties={}),
reason=Forbidden, status=Failure, additionalProperties={}). at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:478)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:417)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:381)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:344)
at
io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleCreate(OperationSupport.java:227)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleCreate(BaseOperation.java:787)
at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.create(BaseOperation.java:357)
at
org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:141)
at
org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:140)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543) at
org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:140)
at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:250)
at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:241)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2543) at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:241)
at
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:204)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 19/06/06
16:03:50 INFO ShutdownHookManager: Shutdown hook called 19/06/06 16:03:50
INFO ShutdownHookManager: Deleting directory
/tmp/spark-0060fe01-33eb-4cb4-b96b-d5be687016bc

Tried creating different clusterrole with admin privilege. But it did not
work.

Any idea how to fix this one? Thanks.


- Nick