[ https://issues.apache.org/jira/browse/SPARK-22014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Simon Schiff updated SPARK-22014: --------------------------------- Description: Hello, I am using spark to process measurement data. It is possible to create sample windows in Spark Streaming, where the duration of the window is smaller than the slide. But when I try to do the same with Spark SQL (The measurement data has a time stamp column) then I got an analysis exception: {code} Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, 60000000, 180000000, 0)' due to data type mismatch: The slide duration (180000000) must be less than or equal to the windowDuration (60000000) {code} Here is a example: {code:java} import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class App { public static Timestamp createTimestamp(String in) throws Exception { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); Date parsedDate = dateFormat.parse(in); return new Timestamp(parsedDate.getTime()); } public static void main(String[] args) { SparkSession spark = SparkSession.builder().appName("Window Sampling Example").getOrCreate(); List<String> sensorData = new ArrayList<String>(); sensorData.add("2017-08-04 00:00:00, 22.75"); sensorData.add("2017-08-04 00:01:00, 23.82"); sensorData.add("2017-08-04 00:02:00, 24.15"); sensorData.add("2017-08-04 00:03:00, 23.16"); sensorData.add("2017-08-04 00:04:00, 22.62"); sensorData.add("2017-08-04 00:05:00, 22.89"); sensorData.add("2017-08-04 00:06:00, 23.21"); sensorData.add("2017-08-04 00:07:00, 24.59"); sensorData.add("2017-08-04 00:08:00, 24.44"); Dataset<String> in = spark.createDataset(sensorData, Encoders.STRING()); StructType sensorSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("timestamp", DataTypes.TimestampType, false), DataTypes.createStructField("value", DataTypes.DoubleType, false), }); Dataset<Row> data = spark.createDataFrame(in.toJavaRDD().map(new Function<String, Row>() { public Row call(String line) throws Exception { return RowFactory.create(createTimestamp(line.split(",")[0]), Double.parseDouble(line.split(",")[1])); } }), sensorSchema); data.groupBy(functions.window(data.col("timestamp"), "1 minutes", "3 minutes")).avg("value").orderBy("window").show(false); } } {code} I think there should be no difference (duration and slide) in a "Spark Streaming window" and a "Spark SQL window" function. was: Hello, I am using spark to process measurement data. It is possible to create sample windows in Spark Streaming, where the duration of the window is smaller than the slide. But when I try to do the same with Spark SQL (The measurement data has a time stamp column) then i got an analysis exception: {code} Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, 60000000, 180000000, 0)' due to data type mismatch: The slide duration (180000000) must be less than or equal to the windowDuration (60000000) {code} Here is a example: {code:java} import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class App { public static Timestamp createTimestamp(String in) throws Exception { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); Date parsedDate = dateFormat.parse(in); return new Timestamp(parsedDate.getTime()); } public static void main(String[] args) { SparkSession spark = SparkSession.builder().appName("Window Sampling Example").getOrCreate(); List<String> sensorData = new ArrayList<String>(); sensorData.add("2017-08-04 00:00:00, 22.75"); sensorData.add("2017-08-04 00:01:00, 23.82"); sensorData.add("2017-08-04 00:02:00, 24.15"); sensorData.add("2017-08-04 00:03:00, 23.16"); sensorData.add("2017-08-04 00:04:00, 22.62"); sensorData.add("2017-08-04 00:05:00, 22.89"); sensorData.add("2017-08-04 00:06:00, 23.21"); sensorData.add("2017-08-04 00:07:00, 24.59"); sensorData.add("2017-08-04 00:08:00, 24.44"); Dataset<String> in = spark.createDataset(sensorData, Encoders.STRING()); StructType sensorSchema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField("timestamp", DataTypes.TimestampType, false), DataTypes.createStructField("value", DataTypes.DoubleType, false), }); Dataset<Row> data = spark.createDataFrame(in.toJavaRDD().map(new Function<String, Row>() { public Row call(String line) throws Exception { return RowFactory.create(createTimestamp(line.split(",")[0]), Double.parseDouble(line.split(",")[1])); } }), sensorSchema); data.groupBy(functions.window(data.col("timestamp"), "1 minutes", "3 minutes")).avg("value").orderBy("window").show(false); } } {code} I think there should be no difference (duration and slide) in a "Spark Streaming window" and a "Spark SQL window" function. > Sample windows in Spark SQL > --------------------------- > > Key: SPARK-22014 > URL: https://issues.apache.org/jira/browse/SPARK-22014 > Project: Spark > Issue Type: Wish > Components: DStreams, SQL > Affects Versions: 2.2.0 > Reporter: Simon Schiff > Priority: Minor > > Hello, > I am using spark to process measurement data. It is possible to create sample > windows in Spark Streaming, where the duration of the window is smaller than > the slide. But when I try to do the same with Spark SQL (The measurement data > has a time stamp column) then I got an analysis exception: > {code} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 60000000, 180000000, 0)' due to data type > mismatch: The slide duration (180000000) must be less than or equal to the > windowDuration (60000000) > {code} > Here is a example: > {code:java} > import java.sql.Timestamp; > import java.text.SimpleDateFormat; > import java.util.ArrayList; > import java.util.Date; > import java.util.List; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.sql.Dataset; > import org.apache.spark.sql.Encoders; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SparkSession; > import org.apache.spark.sql.functions; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructField; > import org.apache.spark.sql.types.StructType; > public class App { > public static Timestamp createTimestamp(String in) throws Exception { > SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd > hh:mm:ss"); > Date parsedDate = dateFormat.parse(in); > return new Timestamp(parsedDate.getTime()); > } > > public static void main(String[] args) { > SparkSession spark = SparkSession.builder().appName("Window > Sampling Example").getOrCreate(); > > List<String> sensorData = new ArrayList<String>(); > sensorData.add("2017-08-04 00:00:00, 22.75"); > sensorData.add("2017-08-04 00:01:00, 23.82"); > sensorData.add("2017-08-04 00:02:00, 24.15"); > sensorData.add("2017-08-04 00:03:00, 23.16"); > sensorData.add("2017-08-04 00:04:00, 22.62"); > sensorData.add("2017-08-04 00:05:00, 22.89"); > sensorData.add("2017-08-04 00:06:00, 23.21"); > sensorData.add("2017-08-04 00:07:00, 24.59"); > sensorData.add("2017-08-04 00:08:00, 24.44"); > > Dataset<String> in = spark.createDataset(sensorData, > Encoders.STRING()); > > StructType sensorSchema = DataTypes.createStructType(new > StructField[] { > DataTypes.createStructField("timestamp", > DataTypes.TimestampType, false), > DataTypes.createStructField("value", > DataTypes.DoubleType, false), > }); > > Dataset<Row> data = > spark.createDataFrame(in.toJavaRDD().map(new Function<String, Row>() { > public Row call(String line) throws Exception { > return > RowFactory.create(createTimestamp(line.split(",")[0]), > Double.parseDouble(line.split(",")[1])); > } > }), sensorSchema); > > data.groupBy(functions.window(data.col("timestamp"), "1 > minutes", "3 minutes")).avg("value").orderBy("window").show(false); > } > } > {code} > I think there should be no difference (duration and slide) in a "Spark > Streaming window" and a "Spark SQL window" function. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org