How does order work in Row objects when .toDF() is called?

2020-11-05 Thread Daniel Stojanov

>>> row_1 = psq.Row(first=1, second=2)
>>> row_2 = psq.Row(second=22, first=11)
>>> spark.sparkContext.parallelize([row_1, row_2]).toDF().collect()
[Row(first=1, second=2), Row(first=22, second=11)]


(Spark 3.0.1)

What is happening in the above? When .toDF() is called it appears that 
order is more important than label. Are labels just a shorthand for _1, 
_2, _3... and don't actually map to a particular value as they would in 
a dict?





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Confuse on Spark to_date function

2020-11-05 Thread Daniel Stojanov


On 5/11/20 2:48 pm, 杨仲鲍 wrote:


Code

```scala
object Suit{ case class Data(node:String,root:String) def apply[A](xs:A 
*):List[A] = xs.toList
   def main(args: Array[String]): Unit ={ val spark = SparkSession.builder() 
.master("local") .appName("MoneyBackTest") .getOrCreate() import 
spark.implicits._
 spark.sql("select to_date('2020-01-01 20:00:00','-MM-dd 
HH:mm:ss')").show(false) } }
```

result

```output

+-+
|to_date('2020-01-01 20:00:00', '-MM-dd HH:mm:ss')|
+-+
|2020-01-01       |
+-+
```

Why not show 2020-01-01 20:00:00

sparkVersion:2.4.4
Device:MacBook


You want to_timestamp instead of to_date.

The following is in Python, but I think you should be able to follow.

>>> row = psq.Row(as_string="2020-01-01 12:01:02")

>>> df = spark.sparkContext.parallelize([row]).toDF()

>>> import pyspark.sql.functions as F

>>> df.withColumn("date_converted", F.to_date(F.column("as_string"), 
"-MM-dd HH:mm:ss")).show()


+---+--+

| as_string|date_converted|

+---+--+

|2020-01-01 12:01:02| 2020-01-01|

+---+--+

>>> df.withColumn("date_converted", 
F.to_timestamp(F.column("as_string"), "-MM-dd HH:mm:ss")).show()


+---+---+

| as_string| date_converted|

+---+---+

|2020-01-01 12:01:02|2020-01-01 12:01:02|

+---+---+



Need suggestions for Spark on K8S: RPC Encryption

2020-11-05 Thread Xuan Gong
Hello, spark experts:

I am trying to figure out how to encrypt traffic when using spark on k8s.
>From the spark security doc, I learned how to do the RPC encryption between
spark driver and spark executors. But I do not understand how to do it
between spark driver and K8S API Server. (and/maybe between spark executor
and API Server. I do not think that there is any connection between the
executors and the API server. But please correct me if I am wrong).

Could you give me some suggestions on how to do the encryption in transit
between spark driver and K8S API Server, please?

Best,

Xuan Gong


Re: Excessive disk IO with Spark structured streaming

2020-11-05 Thread Jungtaek Lim
FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This
reduces the number of temp files for the state store to half when you use
streaming aggregation.

1. https://issues.apache.org/jira/browse/SPARK-30294

On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim 
wrote:

> I can't spend too much time on explaining one by one. I strongly encourage
> you to do a deep-dive instead of just looking around as you want to know
> about "details" - that's how open source works.
>
> I'll go through a general explanation instead of replying inline; probably
> I'd write a blog doc if there's no existing doc (I guess there should be
> one) instead of putting too much time here.
>
> In short, the reason Spark has to create these files "per micro-batch" is
> to ensure fault-tolerance. For example, If the query fails at batch 5 and
> you rerun the query, it should rerun batch 5. How?
>
> Spark should be aware the offsets the query has been read for batch 4,
> preferably the offsets the query read for batch 5. They're offsets/commits.
> State is for storing accumulated values on stateful operations. Same here
> - Spark should be able to read the state for batch 4 so that it can
> calculate the new accumulated values for batch 5. In addition, partition
> means max parallelism (they aren't aware of each other and they shouldn't),
> hence the state for partition should be stored individually.
>
> Storing 4 files (in the end we'll only have "2" files, but here I count
> temp files with crc files, as we are talking about performance aspect) per
> partition per micro-batch is the thing I already explained - I agree it's
> not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
> number of files by half. Probably we could propose Hadoop to skip creating
> CRC files (I'm not sure it can be simply done as of now), but Spark
> is conservative about upgrading the versions for dependencies so it might
> not be available soon even if we address it right away.
>
> As you've found here it's super important to find the right value of
> shuffle partitions. It's partitioned by hash function, so it strongly
> depends on the group key. If the cardinality of group key is low, probably
> the right value of shuffle partitions should be fairly small. Unfortunately
> once the query runs you can't change the value of shuffle partitions, as
> Spark doesn't have the feature of state migration once the number of
> partitions change. Either you need to predict the overall cardinality at
> specific time and set the right value, or try to use a 3rd party state
> tool. [2] (DISCLAIMER: I'm the author.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30294
> 2. https://github.com/HeartSaVioR/spark-state-tools
>
>
> On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev  wrote:
>
>> Hi Jungtaek,
>>
>> *> I meant the subdirectory inside the directory you're providing as
>> "checkpointLocation", as there're several directories in that directory...*
>>
>> There are two:
>>
>> *my-spark-checkpoint-dir/MainApp*
>> created by sparkSession.sparkContext().setCheckpointDir(> for the app>)
>> contains only empty subdir with GUID name
>>
>> *my-spark-checkpoint-dir/writer*
>> created by ds.writeStream().option("checkpointLocation", > for writer>)
>> contains all the files
>>
>> Within the latter ("writer") there are four subdirectories: commits,
>> metadata, offsets, state.
>>
>> Breakdown of file creations within them, per 69 microbatches (when
>> shuffle partition count = 200) is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 56232
>>
>> (Creation is identified by strace record for "openat" system call with
>> O_CREAT flag and file path in the corresponding directory.)
>>
>> When shuffle partition count is 10, breakdown of file creations within
>> them, per 69 microbatches, is:
>>
>> commits = 136
>> metadata = 0
>> offsets = 138
>> state = 2760
>>
>> *> The size of the delta file heavily depends on your stateful operation
>> and data in each micro-batch. delta file only captures the "changes" of
>> state in specific micro-batch, so there're cases you'll have very tiny
>> delta files, e.g. cardinality of grouped key is small (hence cardinality of
>> KVs is also small), small amount of inputs are provided per micro-batch,
>> the overall size of aggregated row is small, there's skew on grouped key
>> (hence some partitions get no input or small inputs), etc.*
>>
>>
>> In my case there is no key in the Row object (unless the bucketized
>> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
>> large enough: the whole problem is that Spark does not want to save the
>> microbatch as a single file. Even after I reduce the number of shuffle
>> partitions (see below), the number of files per microbatch remains
>> significantly larger than the number of shuffle partitions.
>>
>> ..
>>
>> When the number of shuffle partitions is 200, Spark creates 816 files
>> (per microbatch) in checkpoint store and 202 files i