Tathagata,

Thanks, your explanation was great.

The suggestion worked well with the only minutia is that I needed to have
the TS field brought in as a DoubleType() or the time got truncated.

Thanks again,
-Brian








On Wed, Aug 30, 2017 at 1:34 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> 1. Generally, adding columns, etc. will not affect performance because the
> Spark's optimizer will automatically figure out columns that are not needed
> and eliminate in the optimization step. So that should never be a concern.
> 2. Again, this is generally not a concern as the optimizer will take care
> of moving such expressions around
> 3. However, using Python UDF is baaaad for perf. In your case, if the
> problem is that the timestamp is in float, you can cast to the float to
> timestamp type, and it should automatically convert it correctly.
> Something like this *col("ts").cast("timestamp")*
>
> On Wed, Aug 30, 2017 at 11:45 AM, Brian Wylie <briford.wy...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I'm using structured streaming in Spark 2.2.
>>
>> I'm using PySpark and I have data (from a Kafka publisher) where the
>> timestamp is a float that looks like this:  1379288667.631940
>>
>> So here's my code (which is working fine)
>>
>> # SUBSCRIBE: Setup connection to Kafka Stream
>> raw_data = spark.readStream.format('kafka') \
>>   .option('kafka.bootstrap.servers', 'localhost:9092') \
>>   .option('subscribe', 'dns') \
>>   .option('startingOffsets', 'latest') \
>>   .load()
>>
>> # ETL: Hardcoded Schema for DNS records (do this better later)
>> from pyspark.sql.types import StructType, StringType, BooleanType,
>> IntegerType, FloatType
>> from pyspark.sql.functions import from_json, to_json, col, struct
>>
>> dns_schema = StructType() \
>>     .add('ts', FloatType()) \
>>     .add('uid', StringType()) \
>>     .add('id.orig_h', StringType()) \
>>   ....
>>
>> # ETL: Convert raw data into parsed and proper typed data
>> from pyspark.sql.functions import col, length, to_timestamp
>>
>> parsed_data = raw_data \
>>   .select(from_json(col("value").cast("string"),
>> dns_schema).alias('data')) \
>>   .select('data.*')
>>
>> # Convert Bro IDS time to an actual TimeStamp type
>> from pyspark.sql.functions import udf
>> import datetime
>> my_udf = udf(lambda x: datetime.datetime.fromtimestamp(float(x)),
>> TimestampType())
>> parsed_data_with_dt = parsed_data.withColumn('dt', my_udf('ts'))
>>
>> # Then a writestream later...
>>
>> Okay so all this code works fine (the 'dt' field has exactly what I
>> want)... but I'll be streaming in a lot of data.... so here's the questions:
>>
>> - Will the creation of a new dataframe withColumn basically kill
>> performance?
>> - Should I move my UDF into the parsed_data.select(...)  part?
>> - Can my UDF be done by spark.sql directly? (I tried to_timestamp but
>> without luck)
>>
>> Any suggestions/pointers are greatly appreciated.
>>
>> -Brian Wylie
>>
>>
>>
>

Reply via email to