updateBytesRead()

2019-03-01 Thread swastik mittal
Hi,

In Spark source code, Hadoop.scala (in RDD). Spark updates the information
of total bytes read after every 1000 records. Displaying the bytes read
along side the update function it shows 65536. Even if I change the code to
update bytes read after every record it, it still shows 65536 multiple times
till it reads 1000 or more records. Why is this so? Is it because of 65536
bytes is the minimum read (IP packet size is also 65536)? If not can I
change the size a record can hold?

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SPARK Streaming Graphs

2019-03-01 Thread Gourav Sengupta
Hi,

earlier SPARK UI had the streaming stats, but for some reason it was
discontinued in later versions of open source, though Databricks does
provide it internally.

I will be grateful if users could kindly share their learnings, and
approaches to show these graphs with structured streaming.


Regards,
Gourav


Re: [Spark SQL]: sql.DataFrame.replace to accept regexp

2019-03-01 Thread Gourav Sengupta
Hi,

why not just use regexp_replace() unless there is an attachment to the
function replace ofcourse, which is quite understandable :)


Regards,
Gourav



On Fri, Mar 1, 2019 at 2:39 PM Richard Garris 
wrote:

> You can file a feature request at
>
> https://issues.apache.org/jira/projects/SPARK/
>
> As a workaround you can create a user defined function like so:
>
>
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1526931011080774/2518747644544276/6320440561800420/latest.html
>
>
> Richard L. Garris
>
> Director of Field Engineering
>
> Databricks, Inc.
>
> rich...@databricks.com
>
> Mobile: 650.200.0840
>
> databricks.com
> 
>
>
>
>
> On Fri, Mar 1, 2019 at 2:21 AM Nuno Silva 
> wrote:
>
>> Hi,
>>
>> Not sure if I'm delivering my request through the right channel: would it
>> be possible for sql.DataFrame.replace to accept regular expressions, like
>> in pandas.DataFrame.replace
>> 
>> ?
>>
>> Thank you,
>> Nuno
>>
>


Re: Spark on k8s - map persistentStorage for data spilling

2019-03-01 Thread Tomasz Krol
Yeah, seems like that option with making emptyDir larger is something that
we need to consider.

Cheers

Tomasz Krol

On Fri, 1 Mar 2019 at 19:30, Matt Cheah  wrote:

> Ah I see: We always force the local directory to use emptyDir and it
> cannot be configured to use any other volume type. See here
> 
> .
>
>
>
> I am a bit conflicted on this. On one hand, it makes sense to allow for
> users to be able to mount their own volumes to handle spill data. On the
> other hand, I get the impression that emptyDir is the right kind of
> volume for this in a majority of cases – emptyDir is meant to be used for
> temporary storage and is meant to be fast to make workflows like Spark
> performant. Finally, a significant benefit of emptyDir is that Kubernetes
> will handle the cleanup of the directory for you if the pod exits – if you
> use a persistent volume claim you will need to ensure the files are cleaned
> up in the case that the pod exits abruptly.
>
>
>
> I’d wonder if your organization can consider modifying your Kubernetes
> setup to make your emptyDir volumes larger and faster?
>
>
>
> -Matt Cheah
>
>
>
> *From: *Tomasz Krol 
> *Date: *Friday, March 1, 2019 at 10:53 AM
> *To: *Matt Cheah 
> *Cc: *"user@spark.apache.org" 
> *Subject: *Re: Spark on k8s - map persistentStorage for data spilling
>
>
>
> Hi Matt,
>
>
>
> Thanks for coming back to me. Yeah that doesn't work. Basically in the
> properties I set Volume and mounting point as below;
>
>
>
>
> spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.path=/checkpoint
>
>
> spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.readOnly=false
>
>
> spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.claimName=sparkstorage
>
>
>
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.path=/checkpoint
>
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.readOnly=false
>
>
> spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.claimName=sparkstorage
>
>
>
> That works as expected and PVC is mounted in the driver and executor PODs
> on /checkpoint directory.
>
>
>
> As you suggested, first thing what I was trying it was set spark.local.dir
> or env SPARK_LOCAL_DIRS to directory /checkpoint. As my expectation was
> that it will be spilling to my PVC. However this is throwing following
> error:
>
>
>
> "spark-kube-driver" is invalid:
> spec.containers[0].volumeMounts[3].mountPath: Invalid value: "/checkpoint":
> must be unique"
>
>
>
> It seems like it's trying to mount emptyDir with mounting point
> "/checkpoint", but it can't because "/checkpoint" is the directory where
> the PVC is already mounted.
>
>
>
> At the moment it looks like to me, the emptyDir is always used for
> spilling data. The question is how to mount it on the PVC. Unless I miss
> something here.
>
> I can't really run any bigger jobs at the moment because of that.
> Appreciate any feedback :)
>
>
>
> Thanks
>
>
>
> Tom
>
>
>
> On Thu, 28 Feb 2019 at 17:23, Matt Cheah  wrote:
>
> I think we want to change the value of spark.local.dir to point to where
> your PVC is mounted. Can you give that a try and let us know if that moves
> the spills as expected?
>
>
>
> -Matt Cheah
>
>
>
> *From: *Tomasz Krol 
> *Date: *Wednesday, February 27, 2019 at 3:41 AM
> *To: *"user@spark.apache.org" 
> *Subject: *Spark on k8s - map persistentStorage for data spilling
>
>
>
> Hey Guys,
>
>
>
> I hope someone will be able to help me, as I've stuck with this for a
> while:) Basically I am running some jobs on kubernetes as per documentation
>
>
>
> https://spark.apache.org/docs/latest/running-on-kubernetes.html
> [spark.apache.org]
> 
>
>
>
> All works fine, however if I run queries on bigger data volume, then jobs
> failing that there is not enough space in /var/data/spark-1xxx directory.
>
>
>
> Obviously the reason for this is that emptyDir mounted doesnt have enough
> space.
>
>
>
> I also mounted pvc to the driver and executors pods which I can see during
> the runtime. I am wondering if someone knows how to set that data will be
> spilled to different directory (i.e my persistent storage directory)
> instead of empyDir with some limitted space. Or if I can mount the empyDir
> somehow on my pvc. Basically at the moment I cant run any jobs as they are
> failing due to insufficient space in that /var/data directory.
>
>
>
> Thanks
>
> --
>
> Tomasz Krol
> patric...@gmail.com
>
>
>
>
> --
>
> Tomasz Krol
> patric...@gmail.com
>



Re: Spark on k8s - map persistentStorage for data spilling

2019-03-01 Thread Matt Cheah
Ah I see: We always force the local directory to use emptyDir and it cannot be 
configured to use any other volume type. See here.

 

I am a bit conflicted on this. On one hand, it makes sense to allow for users 
to be able to mount their own volumes to handle spill data. On the other hand, 
I get the impression that emptyDir is the right kind of volume for this in a 
majority of cases – emptyDir is meant to be used for temporary storage and is 
meant to be fast to make workflows like Spark performant. Finally, a 
significant benefit of emptyDir is that Kubernetes will handle the cleanup of 
the directory for you if the pod exits – if you use a persistent volume claim 
you will need to ensure the files are cleaned up in the case that the pod exits 
abruptly.

 

I’d wonder if your organization can consider modifying your Kubernetes setup to 
make your emptyDir volumes larger and faster?

 

-Matt Cheah

 

From: Tomasz Krol 
Date: Friday, March 1, 2019 at 10:53 AM
To: Matt Cheah 
Cc: "user@spark.apache.org" 
Subject: Re: Spark on k8s - map persistentStorage for data spilling

 

Hi Matt, 

 

Thanks for coming back to me. Yeah that doesn't work. Basically in the 
properties I set Volume and mounting point as below;

 

spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.path=/checkpoint

spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.readOnly=false

spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.claimName=sparkstorage
 

 

spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.path=/checkpoint

spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.readOnly=false

spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.claimName=sparkstorage
 

 

That works as expected and PVC is mounted in the driver and executor PODs on 
/checkpoint directory.

 

As you suggested, first thing what I was trying it was set spark.local.dir or 
env SPARK_LOCAL_DIRS to directory /checkpoint. As my expectation was that it 
will be spilling to my PVC. However this is throwing following error:

 

"spark-kube-driver" is invalid: spec.containers[0].volumeMounts[3].mountPath: 
Invalid value: "/checkpoint": must be unique"

 

It seems like it's trying to mount emptyDir with mounting point "/checkpoint", 
but it can't because "/checkpoint" is the directory where the PVC is already 
mounted.

 

At the moment it looks like to me, the emptyDir is always used for spilling 
data. The question is how to mount it on the PVC. Unless I miss something here.

I can't really run any bigger jobs at the moment because of that. Appreciate 
any feedback :)

 

Thanks 

 

Tom

 

On Thu, 28 Feb 2019 at 17:23, Matt Cheah  wrote:

I think we want to change the value of spark.local.dir to point to where your 
PVC is mounted. Can you give that a try and let us know if that moves the 
spills as expected?

 

-Matt Cheah

 

From: Tomasz Krol 
Date: Wednesday, February 27, 2019 at 3:41 AM
To: "user@spark.apache.org" 
Subject: Spark on k8s - map persistentStorage for data spilling

 

Hey Guys,

 

I hope someone will be able to help me, as I've stuck with this for a while:) 
Basically I am running some jobs on kubernetes as per documentation

 

https://spark.apache.org/docs/latest/running-on-kubernetes.html 
[spark.apache.org]

 

All works fine, however if I run queries on bigger data volume, then jobs 
failing that there is not enough space in /var/data/spark-1xxx directory.

 

Obviously the reason for this is that emptyDir mounted doesnt have enough space.

 

I also mounted pvc to the driver and executors pods which I can see during the 
runtime. I am wondering if someone knows how to set that data will be spilled 
to different directory (i.e my persistent storage directory) instead of empyDir 
with some limitted space. Or if I can mount the empyDir somehow on my pvc. 
Basically at the moment I cant run any jobs as they are failing due to 
insufficient space in that /var/data directory.

 

Thanks

-- 

Tomasz Krol
patric...@gmail.com


 

-- 

Tomasz Krol
patric...@gmail.com



smime.p7s
Description: S/MIME cryptographic signature


Re: Spark on k8s - map persistentStorage for data spilling

2019-03-01 Thread Tomasz Krol
Hi Matt,

Thanks for coming back to me. Yeah that doesn't work. Basically in the
properties I set Volume and mounting point as below;

spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.path=/checkpoint
spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.readOnly=false
spark.kubernetes.driver.volumes.persistentVolumeClaim.checkvolume.mount.claimName=sparkstorage

spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.path=/checkpoint
spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.readOnly=false
spark.kubernetes.executor.volumes.persistentVolumeClaim.checkvolume.mount.claimName=sparkstorage

That works as expected and PVC is mounted in the driver and executor PODs
on /checkpoint directory.

As you suggested, first thing what I was trying it was set spark.local.dir
or env SPARK_LOCAL_DIRS to directory /checkpoint. As my expectation was
that it will be spilling to my PVC. However this is throwing following
error:

"spark-kube-driver" is invalid:
spec.containers[0].volumeMounts[3].mountPath: Invalid value: "/checkpoint":
must be unique"

It seems like it's trying to mount emptyDir with mounting point
"/checkpoint", but it can't because "/checkpoint" is the directory where
the PVC is already mounted.

At the moment it looks like to me, the emptyDir is always used for spilling
data. The question is how to mount it on the PVC. Unless I miss something
here.
I can't really run any bigger jobs at the moment because of that.
Appreciate any feedback :)

Thanks

Tom

On Thu, 28 Feb 2019 at 17:23, Matt Cheah  wrote:

> I think we want to change the value of spark.local.dir to point to where
> your PVC is mounted. Can you give that a try and let us know if that moves
> the spills as expected?
>
>
>
> -Matt Cheah
>
>
>
> *From: *Tomasz Krol 
> *Date: *Wednesday, February 27, 2019 at 3:41 AM
> *To: *"user@spark.apache.org" 
> *Subject: *Spark on k8s - map persistentStorage for data spilling
>
>
>
> Hey Guys,
>
>
>
> I hope someone will be able to help me, as I've stuck with this for a
> while:) Basically I am running some jobs on kubernetes as per documentation
>
>
>
> https://spark.apache.org/docs/latest/running-on-kubernetes.html
> [spark.apache.org]
> 
>
>
>
> All works fine, however if I run queries on bigger data volume, then jobs
> failing that there is not enough space in /var/data/spark-1xxx directory.
>
>
>
> Obviously the reason for this is that emptyDir mounted doesnt have enough
> space.
>
>
>
> I also mounted pvc to the driver and executors pods which I can see during
> the runtime. I am wondering if someone knows how to set that data will be
> spilled to different directory (i.e my persistent storage directory)
> instead of empyDir with some limitted space. Or if I can mount the empyDir
> somehow on my pvc. Basically at the moment I cant run any jobs as they are
> failing due to insufficient space in that /var/data directory.
>
>
>
> Thanks
>
> --
>
> Tomasz Krol
> patric...@gmail.com
>


-- 
Tomasz Krol
patric...@gmail.com


Is there a way to validate the syntax of raw spark sql query?

2019-03-01 Thread kant kodali
Hi All,

Is there a way to validate the syntax of raw spark SQL query?

for example, I would like to know if there is any isValid API call spark
provides?

val query = "select * from table"if(isValid(query)) {
sparkSession.sql(query) } else {
log.error("Invalid Syntax")}

I tried the following

val query = "select * morf table" // Invalid queryval parser =
spark.sessionState.sqlParsertry{
parser.parseExpression(query)} catch (ParseException ex) {
throw new Exception(ex); //Exception not getting
thrown}Dataset<>Row df = sparkSession.sql(query) // Exception gets
thrown here
df.writeStream.format("console").start()

Question: parser.parseExpression is not catching the invalid syntax before
I hit the sparkSession.sql. Other words it is not being helpful in the
above code. any reason? My whole goal is to catch syntax errors before I
pass it on to sparkSession.sql


Re: Spark 2.4 Structured Streaming Kafka assign API polling same offsets

2019-03-01 Thread Kristopher Kane
I figured out why.  We are not persisting the data at the end of
.load().  Thus, every operation like count() is going back to Kafka
for the data again.

On Fri, Mar 1, 2019 at 10:10 AM Kristopher Kane  wrote:
>
> We are using the assign API to do batch work with Spark and Kafka.
> What I'm seeing is the Spark executor work happening in the back
> ground and constantly polling the same data over and over until the
> main thread commits the offsets.
>
> Is the below a blocking operation?
>
>   Dataset df = spark.read().format("kafka")
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
>   .option("assign", "topic1,topic2")
>   .option("startingOffsets",
> "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
>   .option("endingOffsets",
> "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
>   .load();
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
>
>
> ###
>
> Here is an example.  Our desired batch is 20 records to commit on.
> Due to segment size (this is a test) 12 records are returned in each
> poll. Spark gets to offset 20 and our program is working to
> filter/process/commit but the Spark polling continues again in the
> back ground starting at offset -2 since it has not been committed yet.
> This suggesting the above .read.().load() is non-blocking.
>
>
> 2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
> Getting data from Kafka
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset -2 requested 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Seeking to
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 0
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Polled
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> [compacted-gap-message-0]  12
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
> polling
>
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 1 requested 1
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 2 requested 2
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 3 requested 3
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 4 requested 4
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 5 requested 5
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 6 requested 6
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 7 requested 7
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 8 requested 8
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 9 requested 9
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 10 requested 10
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
> compacted-gap-message-0 nextOffset 11 requested 11
> 2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
> task 0] InternalKafkaConsumer:58 - Get
> spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc4

Spark 2.4 Structured Streaming Kafka assign API polling same offsets

2019-03-01 Thread Kristopher Kane
We are using the assign API to do batch work with Spark and Kafka.
What I'm seeing is the Spark executor work happening in the back
ground and constantly polling the same data over and over until the
main thread commits the offsets.

Is the below a blocking operation?

  Dataset df = spark.read().format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("assign", "topic1,topic2")
  .option("startingOffsets",
"{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets",
"{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");


###

Here is an example.  Our desired batch is 20 records to commit on.
Due to segment size (this is a test) 12 records are returned in each
poll. Spark gets to offset 20 and our program is working to
filter/process/commit but the Spark polling continues again in the
back ground starting at offset -2 since it has not been committed yet.
This suggesting the above .read.().load() is non-blocking.


2019-03-01 09:21:41 INFO  [THREAD ID=main] RawHdfsFlowType:50 -
Getting data from Kafka
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset -2 requested 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 0
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
[compacted-gap-message-0]  12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Offset changed from 0 to 12 after
polling

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 1 requested 1
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 2 requested 2
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 3 requested 3
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 4 requested 4
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 5 requested 5
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 6 requested 6
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 7 requested 7
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 8 requested 8
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 9 requested 9
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 10 requested 10
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 11 requested 11
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Get
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 nextOffset 12 requested 12

2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Seeking to
spark-kafka-relation-23c2c012-41dd-46ec-8319-5ea39dbc46b6-executor
compacted-gap-message-0 12
2019-03-01 09:21:44 DEBUG [THREAD ID=Executor task launch worker for
task 0] InternalKafkaConsumer:58 - Polled
spark

Re: to_avro and from_avro not working with struct type in spark 2.4

2019-03-01 Thread Gabor Somogyi
> I am thinking of writing out the dfKV dataframe to disk and then use Avro
apis to read the data.
Ping me if you have something, I'm planning similar things...


On Thu, Feb 28, 2019 at 5:27 PM Hien Luu  wrote:

> Thanks for the answer.
>
> As far as the next step goes, I am thinking of writing out the dfKV
> dataframe to disk and then use Avro apis to read the data.
>
> This smells like a bug somewhere.
>
> Cheers,
>
> Hien
>
> On Thu, Feb 28, 2019 at 4:02 AM Gabor Somogyi 
> wrote:
>
>> No, just take a look at the schema of dfStruct since you've converted its
>> value column with to_avro:
>>
>> scala> dfStruct.printSchema
>> root
>>  |-- id: integer (nullable = false)
>>  |-- name: string (nullable = true)
>>  |-- age: integer (nullable = false)
>>  |-- value: struct (nullable = false)
>>  ||-- name: string (nullable = true)
>>  ||-- age: integer (nullable = false)
>>
>>
>> On Wed, Feb 27, 2019 at 6:51 PM Hien Luu  wrote:
>>
>>> Thanks for looking into this.  Does this mean string fields should alway
>>> be nullable?
>>>
>>> You are right that the result is not yet correct and further digging is
>>> needed :(
>>>
>>> On Wed, Feb 27, 2019 at 1:19 AM Gabor Somogyi 
>>> wrote:
>>>
 Hi,

 I was dealing with avro stuff lately and most of the time it has
 something to do with the schema.
 One thing I've pinpointed quickly (where I was struggling also) is the
 name field should be nullable but the result is not yet correct so further
 digging needed...

 scala> val expectedSchema = StructType(Seq(StructField("name",
 StringType,true),StructField("age", IntegerType, false)))
 expectedSchema: org.apache.spark.sql.types.StructType =
 StructType(StructField(name,StringType,true),
 StructField(age,IntegerType,false))

 scala> val avroTypeStruct =
 SchemaConverters.toAvroType(expectedSchema).toString
 avroTypeStruct: String =
 {"type":"record","name":"topLevelRecord","fields":[{"name":"name","type":["string","null"]},{"name":"age","type":"int"}]}

 scala> dfKV.select(from_avro('value, avroTypeStruct)).show
 +-+
 |from_avro(value, struct)|
 +-+
 |  [Mary Jane, 25]|
 |  [Mary Jane, 25]|
 +-+

 BR,
 G


 On Wed, Feb 27, 2019 at 7:43 AM Hien Luu  wrote:

> Hi,
>
> I ran into a pretty weird issue with to_avro and from_avro where it
> was not
> able to parse the data in a struct correctly.  Please see the simple
> and
> self contained example below. I am using Spark 2.4.  I am not sure if I
> missed something.
>
> This is how I start the spark-shell on my Mac:
>
> ./bin/spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.0
>
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.avro._
> import org.apache.spark.sql.functions._
>
>
> spark.version
>
> val df = Seq((1, "John Doe",  30), (2, "Mary Jane", 25)).toDF("id",
> "name",
> "age")
>
> val dfStruct = df.withColumn("value", struct("name","age"))
>
> dfStruct.show
> dfStruct.printSchema
>
> val dfKV = dfStruct.select(to_avro('id).as("key"),
> to_avro('value).as("value"))
>
> val expectedSchema = StructType(Seq(StructField("name", StringType,
> false),StructField("age", IntegerType, false)))
>
> val avroTypeStruct =
> SchemaConverters.toAvroType(expectedSchema).toString
>
> val avroTypeStr = s"""
>   |{
>   |  "type": "int",
>   |  "name": "key"
>   |}
> """.stripMargin
>
>
> dfKV.select(from_avro('key, avroTypeStr)).show
>
> // output
> +---+
> |from_avro(key, int)|
> +---+
> |  1|
> |  2|
> +---+
>
> dfKV.select(from_avro('value, avroTypeStruct)).show
>
> // output
> +-+
> |from_avro(value, struct)|
> +-+
> |[, 9]|
> |[, 9]|
> +-+
>
> Please help and thanks in advance.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>>>
>>> --
>>> Regards,
>>>
>>
>
> --
> Regards,
>


Re: [Spark SQL]: sql.DataFrame.replace to accept regexp

2019-03-01 Thread Richard Garris
You can file a feature request at

https://issues.apache.org/jira/projects/SPARK/

As a workaround you can create a user defined function like so:

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1526931011080774/2518747644544276/6320440561800420/latest.html


Richard L. Garris

Director of Field Engineering

Databricks, Inc.

rich...@databricks.com

Mobile: 650.200.0840

databricks.com





On Fri, Mar 1, 2019 at 2:21 AM Nuno Silva 
wrote:

> Hi,
>
> Not sure if I'm delivering my request through the right channel: would it
> be possible for sql.DataFrame.replace to accept regular expressions, like
> in pandas.DataFrame.replace
> 
> ?
>
> Thank you,
> Nuno
>


Spark Streaming loading kafka source value column type

2019-03-01 Thread oskarryn

Hi,

Why is `value` column in streamed dataframe obtained from kafka topic 
natively of binary type (look at the table 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) 
if in fact it holds a string with the message's data and we CAST it as 
string anyways?


Oskar


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark.python.worker.memory VS spark.executor.pyspark.memory

2019-03-01 Thread Andrey Dudin
Hello all.

What is the difference between spark.python.worker.memory and
spark.executor.pyspark.memory?


[Spark SQL]: sql.DataFrame.replace to accept regexp

2019-03-01 Thread Nuno Silva
Hi,

Not sure if I'm delivering my request through the right channel: would it
be possible for sql.DataFrame.replace to accept regular expressions, like
in pandas.DataFrame.replace

?

Thank you,
Nuno