Tathagata, Thanks, your explanation was great.
The suggestion worked well with the only minutia is that I needed to have the TS field brought in as a DoubleType() or the time got truncated. Thanks again, -Brian On Wed, Aug 30, 2017 at 1:34 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > 1. Generally, adding columns, etc. will not affect performance because the > Spark's optimizer will automatically figure out columns that are not needed > and eliminate in the optimization step. So that should never be a concern. > 2. Again, this is generally not a concern as the optimizer will take care > of moving such expressions around > 3. However, using Python UDF is baaaad for perf. In your case, if the > problem is that the timestamp is in float, you can cast to the float to > timestamp type, and it should automatically convert it correctly. > Something like this *col("ts").cast("timestamp")* > > On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie <briford.wy...@gmail.com> > wrote: > >> Hi All, >> >> I'm using structured streaming in Spark 2.2. >> >> I'm using PySpark and I have data (from a Kafka publisher) where the >> timestamp is a float that looks like this: 1379288667.631940 >> >> So here's my code (which is working fine) >> >> # SUBSCRIBE: Setup connection to Kafka Stream >> raw_data = spark.readStream.format('kafka') \ >> .option('kafka.bootstrap.servers', 'localhost:9092') \ >> .option('subscribe', 'dns') \ >> .option('startingOffsets', 'latest') \ >> .load() >> >> # ETL: Hardcoded Schema for DNS records (do this better later) >> from pyspark.sql.types import StructType, StringType, BooleanType, >> IntegerType, FloatType >> from pyspark.sql.functions import from_json, to_json, col, struct >> >> dns_schema = StructType() \ >> .add('ts', FloatType()) \ >> .add('uid', StringType()) \ >> .add('id.orig_h', StringType()) \ >> .... >> >> # ETL: Convert raw data into parsed and proper typed data >> from pyspark.sql.functions import col, length, to_timestamp >> >> parsed_data = raw_data \ >> .select(from_json(col("value").cast("string"), >> dns_schema).alias('data')) \ >> .select('data.*') >> >> # Convert Bro IDS time to an actual TimeStamp type >> from pyspark.sql.functions import udf >> import datetime >> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)), >> TimestampType()) >> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts')) >> >> # Then a writestream later... >> >> Okay so all this code works fine (the 'dt' field has exactly what I >> want)... but I'll be streaming in a lot of data.... so here's the questions: >> >> - Will the creation of a new dataframe withColumn basically kill >> performance? >> - Should I move my UDF into the parsed_data.select(...) part? >> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but >> without luck) >> >> Any suggestions/pointers are greatly appreciated. >> >> -Brian Wylie >> >> >> >