How does order work in Row objects when .toDF() is called?

2020-11-05 Thread Daniel Stojanov

>>> row_1 = psq.Row(first=1, second=2)
>>> row_2 = psq.Row(second=22, first=11)
>>> spark.sparkContext.parallelize([row_1, row_2]).toDF().collect()
[Row(first=1, second=2), Row(first=22, second=11)]

(Spark 3.0.1)

What is happening in the above? When .toDF() is called it appears that 
order is more important than label. Are labels just a shorthand for _1, 
_2, _3... and don't actually map to a particular value as they would in 
a dict?

To unsubscribe e-mail:

Re: Confuse on Spark to_date function

2020-11-05 Thread Daniel Stojanov

On 5/11/20 2:48 pm, 杨仲鲍 wrote:


object Suit{ case class Data(node:String,root:String) def apply[A](xs:A 
*):List[A] = xs.toList
   def main(args: Array[String]): Unit ={ val spark = SparkSession.builder() 
.master("local") .appName("MoneyBackTest") .getOrCreate() import 
 spark.sql("select to_date('2020-01-01 20:00:00','-MM-dd 
HH:mm:ss')").show(false) } }



|to_date('2020-01-01 20:00:00', '-MM-dd HH:mm:ss')|
|2020-01-01       |

Why not show 2020-01-01 20:00:00


You want to_timestamp instead of to_date.

The following is in Python, but I think you should be able to follow.

>>> row = psq.Row(as_string="2020-01-01 12:01:02")

>>> df = spark.sparkContext.parallelize([row]).toDF()

>>> import pyspark.sql.functions as F

>>> df.withColumn("date_converted", F.to_date(F.column("as_string"), 
"-MM-dd HH:mm:ss")).show()


| as_string|date_converted|


|2020-01-01 12:01:02| 2020-01-01|


>>> df.withColumn("date_converted", 
F.to_timestamp(F.column("as_string"), "-MM-dd HH:mm:ss")).show()


| as_string| date_converted|


|2020-01-01 12:01:02|2020-01-01 12:01:02|


Need suggestions for Spark on K8S: RPC Encryption

2020-11-05 Thread Xuan Gong
Hello, spark experts:

I am trying to figure out how to encrypt traffic when using spark on k8s.
>From the spark security doc, I learned how to do the RPC encryption between
spark driver and spark executors. But I do not understand how to do it
between spark driver and K8S API Server. (and/maybe between spark executor
and API Server. I do not think that there is any connection between the
executors and the API server. But please correct me if I am wrong).

Could you give me some suggestions on how to do the encryption in transit
between spark driver and K8S API Server, please?


Xuan Gong

Re: Excessive disk IO with Spark structured streaming

2020-11-05 Thread Jungtaek Lim
FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This
reduces the number of temp files for the state store to half when you use
streaming aggregation.


On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim 

> I can't spend too much time on explaining one by one. I strongly encourage
> you to do a deep-dive instead of just looking around as you want to know
> about "details" - that's how open source works.
> I'll go through a general explanation instead of replying inline; probably
> I'd write a blog doc if there's no existing doc (I guess there should be
> one) instead of putting too much time here.
> In short, the reason Spark has to create these files "per micro-batch" is
> to ensure fault-tolerance. For example, If the query fails at batch 5 and
> you rerun the query, it should rerun batch 5. How?
> Spark should be aware the offsets the query has been read for batch 4,
> preferably the offsets the query read for batch 5. They're offsets/commits.
> State is for storing accumulated values on stateful operations. Same here
> - Spark should be able to read the state for batch 4 so that it can
> calculate the new accumulated values for batch 5. In addition, partition
> means max parallelism (they aren't aware of each other and they shouldn't),
> hence the state for partition should be stored individually.
> Storing 4 files (in the end we'll only have "2" files, but here I count
> temp files with crc files, as we are talking about performance aspect) per
> partition per micro-batch is the thing I already explained - I agree it's
> not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
> number of files by half. Probably we could propose Hadoop to skip creating
> CRC files (I'm not sure it can be simply done as of now), but Spark
> is conservative about upgrading the versions for dependencies so it might
> not be available soon even if we address it right away.
> As you've found here it's super important to find the right value of
> shuffle partitions. It's partitioned by hash function, so it strongly
> depends on the group key. If the cardinality of group key is low, probably
> the right value of shuffle partitions should be fairly small. Unfortunately
> once the query runs you can't change the value of shuffle partitions, as
> Spark doesn't have the feature of state migration once the number of
> partitions change. Either you need to predict the overall cardinality at
> specific time and set the right value, or try to use a 3rd party state
> tool. [2] (DISCLAIMER: I'm the author.)
> 1.
> 2.
> On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev  wrote:
>> Hi Jungtaek,
>> *> I meant the subdirectory inside the directory you're providing as
>> "checkpointLocation", as there're several directories in that directory...*
>> There are two:
>> *my-spark-checkpoint-dir/MainApp*
>> created by sparkSession.sparkContext().setCheckpointDir(> for the app>)
>> contains only empty subdir with GUID name
>> *my-spark-checkpoint-dir/writer*
>> created by ds.writeStream().option("checkpointLocation", > for writer>)
>> contains all the files
>> Within the latter ("writer") there are four subdirectories: commits,
>> metadata, offsets, state.
>> Breakdown of file creations within them, per 69 microbatches (when
>> shuffle partition count = 200) is:
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 56232
>> (Creation is identified by strace record for "openat" system call with
>> O_CREAT flag and file path in the corresponding directory.)
>> When shuffle partition count is 10, breakdown of file creations within
>> them, per 69 microbatches, is:
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 2760
>> *> The size of the delta file heavily depends on your stateful operation
>> and data in each micro-batch. delta file only captures the "changes" of
>> state in specific micro-batch, so there're cases you'll have very tiny
>> delta files, e.g. cardinality of grouped key is small (hence cardinality of
>> KVs is also small), small amount of inputs are provided per micro-batch,
>> the overall size of aggregated row is small, there's skew on grouped key
>> (hence some partitions get no input or small inputs), etc.*
>> In my case there is no key in the Row object (unless the bucketized
>> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
>> large enough: the whole problem is that Spark does not want to save the
>> microbatch as a single file. Even after I reduce the number of shuffle
>> partitions (see below), the number of files per microbatch remains
>> significantly larger than the number of shuffle partitions.
>> ..
>> When the number of shuffle partitions is 200, Spark creates 816 files
>> (per microbatch) in checkpoint store and 202 files i