[apache-spark] [spark-r] 503 Error - Cannot Connect to S3

2020-10-05 Thread Khatri, Faysal
Hello-

I am attempting to use SparkR to read in a parquet file from S3.
The exact same operation succeeds using PySpark - but I get a 503 error using 
SparkR.

In fact, I get the 503 even if I use a bad endpoint or bad credentials. It's as 
if Spark isn't even trying to make the HTTP request. It's the same machine and 
same cluster on which PySpark works flawlessly, though.
I get this warning after about 2 minutes - WARN streaming.FileStreamSink: Error 
while looking for metadata directory.
And the 503 appears after another minute or so.
There are no executors being started, so no logs there.
I am trying to connect to a custom endpoint, if that's relevant.

Any ideas what could be going wrong?
Below is the code and full output.

Thanks in advance.
Faysal

CODE-
library(SparkR)

spark_config_default <- list(spark.dynamicAllocation.enabled='true',
 spark.shuffle.service.enabled='true',
 spark.sql.parquet.binaryAsString='true',
 spark.dynamicAllocation.initialExecutors='3',
 spark.dynamicAllocation.maxExecutors='30',
 spark.driver.memory='2g',
 spark.executor.memory='4g',
 spark.executor.cores='3')


jars_list = c("aws-java-sdk-1.7.4.2.jar",
  "hadoop-aws-2.6.0.jar",
  "aws-s3-1.7.1.jar",
  "hadoop-common-2.6.0.jar"
)

sc <- sparkR.session(master = "yarn", deployMode = "client", sparkConfig = 
spark_config_default, sparkJars = jars_list)

hConf = SparkR:::callJMethod(sc, "conf")
SparkR:::callJMethod(hConf, "set", "fs.s3a.access.key", [access_key])
SparkR:::callJMethod(hConf, "set", "fs.s3a.secret.key", [secret_key])
SparkR:::callJMethod(hConf, "set", "fs.s3a.endpoint", [custom_endpoint)
SparkR:::callJMethod(hConf, "set", "com.amazonaws.services.s3a.enableV4", 
"true")
SparkR:::callJMethod(hConf, "set", "fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")

sdf <- read.parquet("s3a://bucket/file.parquet")

sparkR.stop()

OUTPUT-
20/10/05 17:54:17 WARN streaming.FileStreamSink: Error while looking for 
metadata directory.
20/10/05 17:55:58 ERROR r.RBackendHandler: parquet on 10 failed
java.lang.reflect.InvocationTargetException
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:167)
 at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:108)
 at 
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:40)
 at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
 at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
 at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
 at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
 at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
 at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
 at 
io.netty.channel.AbstractChanne

Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
Sure. My point was that Delta Lake is also one of the 3rd party libraries
and there's no way for Apache Spark to do that. There's a Delta Lake's own
group and the request is better to be there.

On Mon, Oct 5, 2020 at 9:54 PM Enrico Minack  wrote:

> Though spark.read. refers to "built-in" data sources, there is
> nothing that prevents 3rd party libraries to "extend" spark.read in Scala
> or Python. As users know the Spark-way to read built-in data sources, it
> feels natural to hook 3rd party data sources under the same scheme, to give
> users a holistic and integrated feel.
>
> One Scala example (
> https://github.com/G-Research/spark-dgraph-connector#spark-dgraph-connector
> ):
>
> import uk.co.gresearch.spark.dgraph.connector._val triples = 
> spark.read.dgraph.triples("localhost:9080")
>
> and in Python:
>
> from gresearch.spark.dgraph.connector import *triples = 
> spark.read.dgraph.triples("localhost:9080")
>
> I agree that 3rd parties should also support the official
> spark.read.format() and the new catalog approaches.
>
> Enrico
>
> Am 05.10.20 um 14:03 schrieb Jungtaek Lim:
>
> Hi,
>
> "spark.read." is a "shorthand" for "built-in" data sources, not
> for external data sources. spark.read.format() is still an official way to
> use it. Delta Lake is not included in Apache Spark so that is indeed not
> possible for Spark to refer to.
>
> Starting from Spark 3.0, the concept of "catalog" is introduced, which you
> can simply refer to the table from catalog (if the external data source
> provides catalog implementation) and no need to specify the format
> explicitly (as catalog would know about it).
>
> This session explains the catalog and how Cassandra connector leverages
> it. I see some external data sources starting to support catalog, and in
> Spark itself there's some effort to support catalog for JDBC.
>
> https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>
> On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael <
> michael.mo...@siemens-healthineers.com> wrote:
>
>> Hi there,
>>
>>
>>
>> I’m just wondering if there is any incentive to implement read/write
>> methods in the DataFrameReader/DataFrameWriter for delta similar to e.g.
>> parquet?
>>
>>
>>
>> For example, using PySpark, “spark.read.parquet” is available, but
>> “spark.read.delta” is not (same for write).
>>
>> In my opinion, “spark.read.delta” feels more clean and pythonic compared
>> to “spark.read.format(‘delta’).load()”, especially if more options are
>> called, like “mode”.
>>
>>
>>
>> Can anyone explain the reasoning behind this, is this due to the Java
>> nature of Spark?
>>
>> From a pythonic point of view, I could also imagine a single read/write
>> method, with the format as an arg and kwargs related to the different file
>> format options.
>>
>>
>>
>> Best,
>>
>> Michael
>>
>>
>>
>>
>>
>


Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
Replied inline.

On Tue, Oct 6, 2020 at 6:07 AM Sergey Oboguev  wrote:

> Hi Jungtaek,
>
> Thanks for your response.
>
> *> you'd want to dive inside the checkpoint directory and have separate
> numbers per top-subdirectory*
>
> All the checkpoint store numbers are solely for the subdirectory set by
> option("checkpointLocation", .. checkpoint dir for writer ... )
>

>
> Other subdirectories are empty or nearly-empty.
>

I meant the subdirectory inside the directory you're providing as
"checkpointLocation", as there're several directories in that directory,
and they exist for different purposes. It'd be nice if we can determine
whether the issue is all around these directories or specific to a
directory.


>
> *> First of all, you'd want to divide these numbers by the number of
> micro-batches, as file creations in checkpoint directory would occur
> similarly per micro-batch*
>
> There were 69 microbatches, each containing 13,334 events.
>
> Event's Row object size in Java heap is 620 bytes, thus the total amount
> of data in a microbatch (in terms of aggregate Java-heap objects sizes) is
> 8.3 MB.
>
> Average number of system calls per microbatch was:
>
> For query (writer) checkpoint directory:
>
> create/open file = 877
> mkdir = 826
> readlink = 855
> unlink = 607
> rename = 213
> execve readlink = 5116
> execve chmod = 4937
>
> For Spark local directory:
>
> create/open file = 797
> unlink = 200
> mmap = 197
> stat = 2391
>
> (The number for local.stat in the previous message was incorrect).
>
> Physical processing time per microbatch was 3.4 seconds.
>
> That's to store a mere 8.3 MB of uncompressed (Java-heap) data!
>
> Most created "delta" files have file size in the order of 1 KB or less.
> "Snapshot" files are several KB in size.
> One would think that the tiny size of created files is one key factor in
> dismal performance. It causes a very high number of system calls and also
> hugely fragments actual data IO.
>

The size of the delta file heavily depends on your stateful operation and
data in each micro-batch. delta file only captures the "changes" of state
in specific micro-batch, so there're cases you'll have very tiny delta
files, e.g. cardinality of grouped key is small (hence cardinality of KVs
is also small), small amount of inputs are provided per micro-batch, the
overall size of aggregated row is small, there's skew on grouped key (hence
some partitions get no input or small inputs), etc.

The snapshot files will be getting bigger as it contains all of the state
KVs at the specific micro-batch, so you may not want to worry about that
being small.


>
> As a result, using iostat, typical disk write rate was observed only ~ 100
> KB/s.
> (Read rate was near-zero, presumably because all data was in Linux block
> cache.)
>
> Average CPU usage when ingesting data was in the order of 600% (i.e. 6
> cores busy), I presume chiefly for serialization/deserialization, even
> though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most
> immediate limiting factor must have been not CPU saturation but IO latency
> (unless there is some obscure setting limiting the number of
> reading/writing threads). The latency arising, fundamentally, out of very
> large number of tiny files.
>
> Is there a way to control the size of checkpoint "delta" and "snapshot"
> files Spark creates, to make them larger?
>

Unfortunately no - it's used for fault-tolerance guarantee (stateful
exactly-once) per micro-batch. All stateful operations should write a delta
file per shuffle partition (spark.sql.shuffle.partitions) per micro-batch.

The default value of shuffle partitions is 200, hence in each microbatch
the query will create 200 files for each state store by default. (You can
reduce this value from the start of the streaming query, so that's a thing
you can tweak.) In reality, you still need to multiply it by 4, as there's
also a crc file per file if HDFS API picks the filesystem as checksum file
system, as well as Spark creates two files (read/write) for streaming
aggregation. (I hope SPARK-30294 would address it - after that we no longer
need to multiply by 2 because of read/write purpose.)

The only way for now to have less small files is increasing the interval of
micro-batch, which may bring another concern, batch size (and output size)
and output latency. That is a downside compared to what streaming
frameworks provide - in streaming frameworks, having a longer interval of
checkpoint only affects the amount of data to restore when failing. If you
expect end-to-end exactly-once then output latency is also affected, but
it's an option end users can tolerate.

Probably micro-batch could also decouple micro-batch interval and
checkpoint interval to provide flexibility, say, I can tolerate
reprocessing up to 10 mins of data being processed when fail occurs, but
due to the output latency I should have micro-batch interval as 30 seconds.
(In other words, do a checkpoint per around 20 micro-batches.) Th

Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Sergey Oboguev
 Hi Jungtaek,

Thanks for your response.

*> you'd want to dive inside the checkpoint directory and have separate
numbers per top-subdirectory*

All the checkpoint store numbers are solely for the subdirectory set by
option("checkpointLocation", .. checkpoint dir for writer ... )

Other subdirectories are empty or nearly-empty.

*> First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch*

There were 69 microbatches, each containing 13,334 events.

Event's Row object size in Java heap is 620 bytes, thus the total amount of
data in a microbatch (in terms of aggregate Java-heap objects sizes) is 8.3
MB.

Average number of system calls per microbatch was:

For query (writer) checkpoint directory:

create/open file = 877
mkdir = 826
readlink = 855
unlink = 607
rename = 213
execve readlink = 5116
execve chmod = 4937

For Spark local directory:

create/open file = 797
unlink = 200
mmap = 197
stat = 2391

(The number for local.stat in the previous message was incorrect).

Physical processing time per microbatch was 3.4 seconds.

That's to store a mere 8.3 MB of uncompressed (Java-heap) data!

Most created "delta" files have file size in the order of 1 KB or less.
"Snapshot" files are several KB in size.
One would think that the tiny size of created files is one key factor in
dismal performance. It causes a very high number of system calls and also
hugely fragments actual data IO.

As a result, using iostat, typical disk write rate was observed only ~ 100
KB/s.
(Read rate was near-zero, presumably because all data was in Linux block
cache.)

Average CPU usage when ingesting data was in the order of 600% (i.e. 6
cores busy), I presume chiefly for serialization/deserialization, even
though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most
immediate limiting factor must have been not CPU saturation but IO latency
(unless there is some obscure setting limiting the number of
reading/writing threads). The latency arising, fundamentally, out of very
large number of tiny files.

Is there a way to control the size of checkpoint "delta" and "snapshot"
files Spark creates, to make them larger?
And the same also for the files in Spark local directory?

* * *

The numbers for checkpoint directory are, of course, captured when it was
set to a local drive (or Lustre/NFS.).

For HDFS there are obviously no local file system calls for the checkpoint
store, as HDFS does not present itself as an OS-level file system.
Nevertheless the name of checkpoint directory was transmitted over HDFS
connection socket 1,675 times per microbatch, so the number of high-level
HDFS file operations must have been at least that high.

* * *

On a related note, for 920,000 events Spark made 700,000 attempts to
execute chmod or readlink program, i.e. to launch an external subprocess
with an executable in order to perform a file operation. Those 900,000
attempts actually represent 150,000 cycles, and in each cycle Spark tried
to launch the program from 6 different locations (/usr/local/sbin ->
/usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it
finally finds it in the last. But then on the next cycle Spark/Hadoop does
not re-use the knowledge of a previously found utility location, and
repeats the search from the very start causing useless file system search
operations over and over again.

This may or may not matter when HDFS is used for checkpoint store
(depending on how HDFS server implements the calls), but it does matter
when a file system like Lustre or NFS is used for checkpoint storage.
(Not to mention spawning readlink and chmod does not seem like a bright
idea in the first place, although perhaps there might be a reason why
Hadoop layer does it this way).

Thanks,
Sergey

On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim 
wrote:

> First of all, you'd want to divide these numbers by the number of
> micro-batches, as file creations in checkpoint directory would occur
> similarly per micro-batch.
> Second, you'd want to dive inside the checkpoint directory and have
> separate numbers per top-subdirectory.
>
> After that we can see whether the value would make sense or not.
>
> Regarding file I/O issues on SS, two issues I know about are:
> 1) If you use streaming aggregation, it unnecessarily creates a temporary
> file for both read and write on the state store, while the file is only
> needed for writing. That makes the number of file creations to be 2x. The
> patch is proposed under SPARK-30294 [1].
>
> 2) Spark leverages HDFS API which is configured to create crc file per
> file by default. (So you'll have 2x files than expected.) There's a bug in
> HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename
> (in short of how checkpoint works in SS, temp file is atomically renamed to
> be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
> delete the crc file which two addition

Re: Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Jainshasha,

I need to read each row from Dataframe and made some changes to it before
inserting it into ES.

Thanks
Siva

On Mon, Oct 5, 2020 at 8:06 PM jainshasha  wrote:

> Hi Siva
>
> To emit data into ES using spark structured streaming job you need to used
> ElasticSearch jar which has support for sink for spark structured streaming
> job. For this you can use this one my branch where we have integrated ES
> with spark 3.0 and scala 2.12 compatible
> https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0
>
> Also in this you need to build three jars
> elasticsearch-hadoop-sql
> elasticsearch-hadoop-core
> elasticsearch-hadoop-mr
> which help in writing data into ES through spark structured streaming.
>
> And in your application job u can use this way to sink the data, remember
> with ES there is only support of append mode of structured streaming.
> val esDf = aggregatedDF
> .writeStream
> .outputMode("append")
> .format("org.elasticsearch.spark.sql")
> .option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
> .start("aggregation-job-index-latest-1")
>
>
> Let me know if you face any issues, will be happy to help you :)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming ElasticSearch

2020-10-05 Thread jainshasha
Hi Siva

To emit data into ES using spark structured streaming job you need to used
ElasticSearch jar which has support for sink for spark structured streaming
job. For this you can use this one my branch where we have integrated ES
with spark 3.0 and scala 2.12 compatible
https://github.com/ThalesGroup/spark/tree/guavus/v3.0.0

Also in this you need to build three jars 
elasticsearch-hadoop-sql
elasticsearch-hadoop-core
elasticsearch-hadoop-mr
which help in writing data into ES through spark structured streaming.

And in your application job u can use this way to sink the data, remember
with ES there is only support of append mode of structured streaming.
val esDf = aggregatedDF
.writeStream
.outputMode("append")
.format("org.elasticsearch.spark.sql")
.option(CHECKPOINTLOCATION, kafkaCheckpointDirPath + "/es")
.start("aggregation-job-index-latest-1")


Let me know if you face any issues, will be happy to help you :)




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Team,

I have a spark streaming job, which will read from kafka and write into
elastic via Http request.

I want to validate each request from Kafka and change the payload as per
business need and write into Elastic Search.

I have used ES Http Request to push the data into Elastic Search. Can some
guide me how to write the data into ES via a data frame?

*Code Snippet: *
 val dfInput = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("group.id", sourceTopicGroupId)
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

import spark.implicits._

val resultDf = dfInput
  .withColumn("value", $"value".cast("string"))
  .select("value")

resultDf.writeStream.foreach(new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex,
msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
  }

  override def close(errorOrNull: Throwable): Unit = {
  }

}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination()
//"1 second"
  }

Please suggest, is there any approach.

Thanks


Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Enrico Minack
Though spark.read. refers to "built-in" data sources, there is 
nothing that prevents 3rd party libraries to "extend" spark.read in 
Scala or Python. As users know the Spark-way to read built-in data 
sources, it feels natural to hook 3rd party data sources under the same 
scheme, to give users a holistic and integrated feel.


One Scala example 
(https://github.com/G-Research/spark-dgraph-connector#spark-dgraph-connector):


import  uk.co.gresearch.spark.dgraph.connector._
val  triples  =  spark.read.dgraph.triples("localhost:9080")

and in Python:

from  gresearch.spark.dgraph.connector  import  *
triples  =  spark.read.dgraph.triples("localhost:9080")

I agree that 3rd parties should also support the official 
spark.read.format() and the new catalog approaches.


Enrico


Am 05.10.20 um 14:03 schrieb Jungtaek Lim:

Hi,

"spark.read." is a "shorthand" for "built-in" data sources, 
not for external data sources. spark.read.format() is still an 
official way to use it. Delta Lake is not included in Apache Spark so 
that is indeed not possible for Spark to refer to.


Starting from Spark 3.0, the concept of "catalog" is introduced, which 
you can simply refer to the table from catalog (if the external data 
source provides catalog implementation) and no need to specify the 
format explicitly (as catalog would know about it).


This session explains the catalog and how Cassandra connector 
leverages it. I see some external data sources starting to support 
catalog, and in Spark itself there's some effort to support catalog 
for JDBC.

https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael 
> wrote:


Hi there,

I’m just wondering if there is any incentive to implement
read/write methods in the DataFrameReader/DataFrameWriter for
delta similar to e.g. parquet?

For example, using PySpark, “spark.read.parquet” is available, but
“spark.read.delta” is not (same for write).

In my opinion, “spark.read.delta” feels more clean and pythonic
compared to “spark.read.format(‘delta’).load()”, especially if
more options are called, like “mode”.

Can anyone explain the reasoning behind this, is this due to the
Java nature of Spark?

From a pythonic point of view, I could also imagine a single
read/write method, with the format as an arg and kwargs related to
the different file format options.

Best,

Michael



Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Jungtaek Lim
First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have
separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary
file for both read and write on the state store, while the file is only
needed for writing. That makes the number of file creations to be 2x. The
patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file
by default. (So you'll have 2x files than expected.) There's a bug in HDFS
API (HADOOP-16255 [2]) which missed to handle crc files during rename (in
short of how checkpoint works in SS, temp file is atomically renamed to be
the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
delete the crc file which two additional operations (exist -> delete) may
occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://issues.apache.org/jira/browse/HADOOP-16255
3. https://issues.apache.org/jira/browse/SPARK-28025


On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev  wrote:

> I am trying to run a Spark structured streaming program simulating basic
> scenario of ingesting events and calculating aggregates on a window with
> watermark, and I am observing an inordinate amount of disk IO Spark
> performs.
>
> The basic structure of the program is like this:
>
> sparkSession = SparkSession.builder()
>.appName()
>.master("local[*]")
>.config("spark.executor.memory", "8g")
>.config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>.config("spark.kryoserializer.buffer", "8m")
>.config("spark.local.dir", ...local
> directory...)
>.getOrCreate();
>
> sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the
> app ...);
>
> dataset = sparkSession.readStream()
>   .option("checkpointLocation", ... checkpoint dir for
> source ...)
>   .format(MockStreamingSource.class.getName())
>   .load();
>
> Dataset ds = dataset
>   .withWatermark("timestamp", "10 minutes")
>   .groupBy(
>   functions.window(functions.col("timestamp"),
> "2 minutes"),
>   functions.col("source"))
>   .agg(
>   functions.avg("D0").as("AVG_D0"),
>   functions.avg("I0").as("AVG_I0"));
>
> DataStreamWriter dsw = ds.writeStream()
>   // .trigger(Trigger.ProcessingTime("1
> minute"))
>   .option("checkpointLocation", .. checkpoint
> dir for writer ... );
>
> dsw.outputMode(OutputMode.Append())
>.format("console")
>.option("truncate", "false")
>.option("numRows", Integer.MAX_VALUE)
>.start()
>.awaitTermination();
>
>
> MockStreamingSource is just that -- a source intended to provide a
> simulated input. It generates microbatches of mock events and sends them to
> the app. In the testing scenario, the source simulates 20,000 devices each
> sending an event every 15 seconds for 11.5 minutes of logical time (just
> under 12 minutes of window size + watermark), for a total number of 920,000
> events.
>
> I initially started with microbatch sized to 500 events, and processing
> performance was totally dismal because of disk IO. I then increased
> microbatch size and performance got better, but still very poor. Microbatch
> size now is 13,334 events per batch, this corresponds to ingestion interval
> of 10 seconds. Smaller batches resulted in worse performance.
>
> But even with microbatch sized 13,334 event performance is poor because of
> excessive disk IO generated by Spark.
> Just ingesting data generated intra-app takes the program physical time
> equal to 40% of window size + watermark.
>
> Using strace, I measured that checkpoint directory for the stream writer
> receives the following number of Linux system calls:
>
> create/open file = 60,500 calls
> mkdir = 57,000
> readlink = 59,000
> unlink = 41,900
> rename = 14,700
> execve readlink=353,000 (incl. repetitive searches of readlink executable
> in 6 different locations)
> execve chmod=340,620 (incl. repetitive searches of chmod executable in 6
> different locations)
>
> In addition, Spark local directory received:
>
> create/open file = 55,000 calls
> unlink = 13,800
> stat = 42,000
>
> That's for mere 920,000 of small events (each event Row is 600 bytes when
> in Java 

Re: Arbitrary stateful aggregation: updating state without setting timeout

2020-10-05 Thread Jungtaek Lim
Hi,

That's not explained in the SS guide doc but explained in the scala API doc.
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/GroupState.html

The statement being quoted from the scala API doc answers your question.

The timeout is reset every time the function is called on a group, that is,
> when the group has new data, or the group has timed out. So the user has to
> set the timeout duration every time the function is called, otherwise there
> will not be any timeout set.


Simply saying, you'd want to always set timeout unless you remove state for
the group (key).

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

‪On Mon, Oct 5, 2020 at 6:16 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
yur...@gmail.com> wrote:‬

> Hi all, I have following question:
>
> What happens to the state (in terms of expiration) if I’m updating the
> state without setting timeout?
>
>
> E.g. in FlatMapGroupsWithStateFunction
>
>1. first batch:
>
> state.update(myObj)
>
> state.setTimeoutDuration(timeout)
>
>1. second batch:
>
> state.update(myObj)
>
>1. third batch (no data for a long time):
>   1.  state timed-out after initial timeout  expired? Not
>   timed-out?
>
>


Re: [Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Jungtaek Lim
Hi,

"spark.read." is a "shorthand" for "built-in" data sources, not for
external data sources. spark.read.format() is still an official way to use
it. Delta Lake is not included in Apache Spark so that is indeed not
possible for Spark to refer to.

Starting from Spark 3.0, the concept of "catalog" is introduced, which you
can simply refer to the table from catalog (if the external data source
provides catalog implementation) and no need to specify the format
explicitly (as catalog would know about it).

This session explains the catalog and how Cassandra connector leverages it.
I see some external data sources starting to support catalog, and in Spark
itself there's some effort to support catalog for JDBC.
https://databricks.com/fr/session_na20/datasource-v2-and-cassandra-a-whole-new-world

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Mon, Oct 5, 2020 at 8:53 PM Moser, Michael <
michael.mo...@siemens-healthineers.com> wrote:

> Hi there,
>
>
>
> I’m just wondering if there is any incentive to implement read/write
> methods in the DataFrameReader/DataFrameWriter for delta similar to e.g.
> parquet?
>
>
>
> For example, using PySpark, “spark.read.parquet” is available, but
> “spark.read.delta” is not (same for write).
>
> In my opinion, “spark.read.delta” feels more clean and pythonic compared
> to “spark.read.format(‘delta’).load()”, especially if more options are
> called, like “mode”.
>
>
>
> Can anyone explain the reasoning behind this, is this due to the Java
> nature of Spark?
>
> From a pythonic point of view, I could also imagine a single read/write
> method, with the format as an arg and kwargs related to the different file
> format options.
>
>
>
> Best,
>
> Michael
>
>
>
>
>


[Spark Core] Why no spark.read.delta / df.write.delta?

2020-10-05 Thread Moser, Michael
Hi there,

I'm just wondering if there is any incentive to implement read/write methods in 
the DataFrameReader/DataFrameWriter for delta similar to e.g. parquet?

For example, using PySpark, "spark.read.parquet" is available, but 
"spark.read.delta" is not (same for write).
In my opinion, "spark.read.delta" feels more clean and pythonic compared to 
"spark.read.format('delta').load()", especially if more options are called, 
like "mode".

Can anyone explain the reasoning behind this, is this due to the Java nature of 
Spark?
>From a pythonic point of view, I could also imagine a single read/write 
>method, with the format as an arg and kwargs related to the different file 
>format options.

Best,
Michael




Reading BigQuery data from Spark in Google Dataproc

2020-10-05 Thread Mich Talebzadeh
Hi,

I have testest few JDBC BigQuery providers like Progress Direct and Simba
but none of them seem to work properly through Spark.

The only way I can read and write to BigQuery is through Spark BigQuery API
using the following scenario

spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar


Using the following JDBC connection to read


val BQDF = spark.read.

format("bigquery").

option("credentialsFile",jsonKeyFile).

option("project", projectId).

option("parentProject", projectId).

option("dataset", targetDataset).

option("table", targetTable).

option("partitionColumn", partitionColumn).

option("lowerBound", lowerBound).

option("upperBound", upperBound).

option("numPartitions", numPartitions).

load()

and for write

rsBatch.
  write.
  format("bigquery").
  mode(org.apache.spark.sql.SaveMode.Append).
  option("table", fullyQualifiedOutputTableId).
  save()

Appreciate any comments if someone has managed to make this work through
any third party JDBC drivers.

Regards,

Mich



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Arbitrary stateful aggregation: updating state without setting timeout

2020-10-05 Thread יורי אולייניקוב
Hi all, I have following question:

What happens to the state (in terms of expiration) if I’m updating the
state without setting timeout?


E.g. in FlatMapGroupsWithStateFunction

   1. first batch:

state.update(myObj)

state.setTimeoutDuration(timeout)

   1. second batch:

state.update(myObj)

   1. third batch (no data for a long time):
  1.  state timed-out after initial timeout  expired? Not
  timed-out?