Hi All,
I'm using structured streaming in Spark 2.2.
I'm using PySpark and I have data (from a Kafka publisher) where the
timestamp is a float that looks like this: 1379288667.631940
So here's my code (which is working fine)
# SUBSCRIBE: Setup connection to Kafka Stream
raw_data = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'dns') \
.option('startingOffsets', 'latest') \
.load()
# ETL: Hardcoded Schema for DNS records (do this better later)
from pyspark.sql.types import StructType, StringType, BooleanType,
IntegerType, FloatType
from pyspark.sql.functions import from_json, to_json, col, struct
dns_schema = StructType() \
.add('ts', FloatType()) \
.add('uid', StringType()) \
.add('id.orig_h', StringType()) \
....
# ETL: Convert raw data into parsed and proper typed data
from pyspark.sql.functions import col, length, to_timestamp
parsed_data = raw_data \
.select(from_json(col("value").cast("string"), dns_schema).alias('data'))
\
.select('data.*')
# Convert Bro IDS time to an actual TimeStamp type
from pyspark.sql.functions import udf
import datetime
my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
TimestampType())
parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
# Then a writestream later...
Okay so all this code works fine (the 'dt' field has exactly what I
want)... but I'll be streaming in a lot of data.... so here's the questions:
- Will the creation of a new dataframe withColumn basically kill
performance?
- Should I move my UDF into the parsed_data.select(...) part?
- Can my UDF be done by spark.sql directly? (I tried to_timestamp but
without luck)
Any suggestions/pointers are greatly appreciated.
-Brian Wylie