Hi All, I'm using structured streaming in Spark 2.2.
I'm using PySpark and I have data (from a Kafka publisher) where the timestamp is a float that looks like this: 1379288667.631940 So here's my code (which is working fine) # SUBSCRIBE: Setup connection to Kafka Stream raw_data = spark.readStream.format('kafka') \ .option('kafka.bootstrap.servers', 'localhost:9092') \ .option('subscribe', 'dns') \ .option('startingOffsets', 'latest') \ .load() # ETL: Hardcoded Schema for DNS records (do this better later) from pyspark.sql.types import StructType, StringType, BooleanType, IntegerType, FloatType from pyspark.sql.functions import from_json, to_json, col, struct dns_schema = StructType() \ .add('ts', FloatType()) \ .add('uid', StringType()) \ .add('id.orig_h', StringType()) \ .... # ETL: Convert raw data into parsed and proper typed data from pyspark.sql.functions import col, length, to_timestamp parsed_data = raw_data \ .select(from_json(col("value").cast("string"), dns_schema).alias('data')) \ .select('data.*') # Convert Bro IDS time to an actual TimeStamp type from pyspark.sql.functions import udf import datetime my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)), TimestampType()) parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts')) # Then a writestream later... Okay so all this code works fine (the 'dt' field has exactly what I want)... but I'll be streaming in a lot of data.... so here's the questions: - Will the creation of a new dataframe withColumn basically kill performance? - Should I move my UDF into the parsed_data.select(...) part? - Can my UDF be done by spark.sql directly? (I tried to_timestamp but without luck) Any suggestions/pointers are greatly appreciated. -Brian Wylie