Hi All,

I'm using structured streaming in Spark 2.2.

I'm using PySpark and I have data (from a Kafka publisher) where the
timestamp is a float that looks like this:  1379288667.631940

So here's my code (which is working fine)

# SUBSCRIBE: Setup connection to Kafka Stream
raw_data = spark.readStream.format('kafka') \
  .option('kafka.bootstrap.servers', 'localhost:9092') \
  .option('subscribe', 'dns') \
  .option('startingOffsets', 'latest') \
  .load()

# ETL: Hardcoded Schema for DNS records (do this better later)
from pyspark.sql.types import StructType, StringType, BooleanType,
IntegerType, FloatType
from pyspark.sql.functions import from_json, to_json, col, struct

dns_schema = StructType() \
    .add('ts', FloatType()) \
    .add('uid', StringType()) \
    .add('id.orig_h', StringType()) \
  ....

# ETL: Convert raw data into parsed and proper typed data
from pyspark.sql.functions import col, length, to_timestamp

parsed_data = raw_data \
  .select(from_json(col("value").cast("string"), dns_schema).alias('data'))
\
  .select('data.*')

# Convert Bro IDS time to an actual TimeStamp type
from pyspark.sql.functions import udf
import datetime
my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
TimestampType())
parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))

# Then a writestream later...

Okay so all this code works fine (the 'dt' field has exactly what I
want)... but I'll be streaming in a lot of data.... so here's the questions:

- Will the creation of a new dataframe withColumn basically kill
performance?
- Should I move my UDF into the parsed_data.select(...)  part?
- Can my UDF be done by spark.sql directly? (I tried to_timestamp but
without luck)

Any suggestions/pointers are greatly appreciated.

-Brian Wylie

Reply via email to