[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/13957


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69363852
  
--- Diff: 
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py 
---
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ sliding window of configurable duration. Each line from the network is 
tagged
+ with a timestamp that is used to determine the windows into which it 
falls.
+
+ Usage: structured_network_wordcount_windowed.py   
+   
+  and  describe the TCP server that Structured Streaming
+ would connect to receive data.
+  gives the size of window, specified as integer number 
of seconds
+  gives the amount of time successive windows are offset 
from one another,
+ given in the same units as above.  should be less than or 
equal to
+ . If the two are equal, successive windows have no 
overlap. If
+  is not provided, it defaults to .
+
+ To run this on your local machine, you need to first run a Netcat server
+`$ nc -lk `
+ and then run the example
+`$ bin/spark-submit
+
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+localhost   `
+
+ One recommended ,  pair is 60, 30
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+if len(sys.argv) != 5 and len(sys.argv) != 4:
+msg = ("Usage: structured_network_wordcount_windowed.py  
 "
+   " ")
--- End diff --

nit: optional args are usually written in square brackets, eg `[]` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69363771
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala
 ---
@@ -85,6 +86,47 @@ class TextSocketStreamSuite extends StreamTest with 
SharedSQLContext with Before
 }
   }
 
+  test("timestamped usage") {
+serverThread = new ServerThread()
+serverThread.start()
+
+val provider = new TextSocketSourceProvider
+val parameters = Map("host" -> "localhost", "port" -> 
serverThread.port.toString,
+  "includeTimestamp" -> "true")
+val schema = provider.sourceSchema(sqlContext, None, "", parameters)._2
+assert(schema === StructType(StructField("value", StringType) ::
+  StructField("timestamp", TimestampType) :: Nil))
+
+source = provider.createSource(sqlContext, "", None, "", parameters)
+
+failAfter(streamingTimeout) {
+  serverThread.enqueue("hello")
+  while (source.getOffset.isEmpty) {
+Thread.sleep(10)
+  }
+  val offset1 = source.getOffset.get
+  val batch1 = source.getBatch(None, offset1)
+  val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq
+  assert(batch1Seq.map(_._1) === Seq("hello"))
+  val batch1Stamp = batch1Seq(0)._2
+
+  serverThread.enqueue("world")
+  while (source.getOffset.get === offset1) {
+Thread.sleep(10)
+  }
+  val offset2 = source.getOffset.get
+  val batch2 = source.getBatch(Some(offset1), offset2)
+  val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq
+  assert(batch2Seq.map(_._1) === Seq("world"))
+  val batch2Stamp = batch2Seq(0)._2
+  assert(!batch2Stamp.before(batch1Stamp))
+
+  // Try stopping the source to make sure this does not block forever.
+  source.stop()
+  source = null
+}
+  }
+
--- End diff --

add tests below to make sure includeTimestamp errors are checked. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69363665
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -136,7 +149,8 @@ class TextSocketSourceProvider extends 
StreamSourceProvider with DataSourceRegis
   parameters: Map[String, String]): Source = {
 val host = parameters("host")
 val port = parameters("port").toInt
-new TextSocketSource(host, port, sqlContext)
+new TextSocketSource(host, port,
+  parameters.getOrElse("includeTimestamp", "false").toBoolean, 
sqlContext)
--- End diff --

but its better to have single function that catches that error and prints a 
better error message like "value of option includeTimestamp cannot be parsed. 
allowed values are "true" or "false".
also it needs to be a IllegalArgumentException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69363332
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -92,7 +102,11 @@ class TextSocketSource(host: String, port: Int, 
sqlContext: SQLContext)
 val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
 val data = synchronized { lines.slice(startIdx, endIdx) }
 import sqlContext.implicits._
-data.toDF("value")
+if (includeTimestamp) {
+  data.toDF("value", "timestamp")
+} else {
+  data.map(_._1).toDF("value")
--- End diff --

nit: `data.select("value")` does not work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69363245
  
--- Diff: 
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py 
---
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ sliding window of configurable duration. Each line from the network is 
tagged
+ with a timestamp that is used to determine the windows into which it 
falls.
+
+ Usage: structured_network_wordcount_windowed.py   
+   
+  and  describe the TCP server that Structured Streaming
+ would connect to receive data.
+  gives the size of window, specified as integer number 
of seconds
+  gives the amount of time successive windows are offset 
from one another,
+ given in the same units as above.  should be less than or 
equal to
+ . If the two are equal, successive windows have no 
overlap. If
+  is not provided, it defaults to .
+
+ To run this on your local machine, you need to first run a Netcat server
+`$ nc -lk `
+ and then run the example
+`$ bin/spark-submit
+
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+localhost   `
+
+ One recommended ,  pair is 60, 30
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+if len(sys.argv) != 5 and len(sys.argv) != 4:
+msg = ("Usage: structured_network_wordcount_windowed.py  
 "
+   " ")
+print(msg, file=sys.stderr)
+exit(-1)
+
+host = sys.argv[1]
+port = int(sys.argv[2])
+windowSize = int(sys.argv[3])
+slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
+if slideSize > windowSize:
+print(" must be less than or equal to ", file=sys.stderr)
+windowArg = '{} seconds'.format(windowSize)
+slideArg = '{} seconds'.format(slideSize)
+
+
+spark = SparkSession\
+.builder\
+.appName("StructuredNetworkWordCountWindowed")\
+.getOrCreate()
+
+# Create DataFrame representing the stream of input lines from 
connection to host:port
+lines = spark\
+.readStream\
+.format('socket')\
+.option('host', host)\
+.option('port', port)\
+.option('includeTimestamp', 'true')\
+.load()
+
+# Split the lines into words, retaining timestamps
+words = lines.select(
+# explode turns each item in an array into a separate row
+explode(split(lines.value, ' ')).alias('word'),
+lines.timestamp
+)
+
+# Group the data by window and word and compute the count of each group
+windowedCounts = words.groupBy(
+window(words.timestamp, windowArg, slideArg),
--- End diff --

rename here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69362821
  
--- Diff: 
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py 
---
@@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ sliding window of configurable duration. Each line from the network is 
tagged
+ with a timestamp that is used to determine the windows into which it 
falls.
+
+ Usage: structured_network_wordcount_windowed.py   
+   
+  and  describe the TCP server that Structured Streaming
+ would connect to receive data.
+  gives the size of window, specified as integer number 
of seconds
+  gives the amount of time successive windows are offset 
from one another,
+ given in the same units as above.  should be less than or 
equal to
+ . If the two are equal, successive windows have no 
overlap. If
+  is not provided, it defaults to .
+
+ To run this on your local machine, you need to first run a Netcat server
+`$ nc -lk `
+ and then run the example
+`$ bin/spark-submit
+
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+localhost   `
+
+ One recommended ,  pair is 60, 30
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+if len(sys.argv) != 5 and len(sys.argv) != 4:
+msg = ("Usage: structured_network_wordcount_windowed.py  
 "
+   " ")
+print(msg, file=sys.stderr)
+exit(-1)
+
+host = sys.argv[1]
+port = int(sys.argv[2])
+windowSize = int(sys.argv[3])
+slideSize = int(sys.argv[4]) if (len(sys.argv) == 5) else windowSize
+if slideSize > windowSize:
+print(" must be less than or equal to ", file=sys.stderr)
+windowArg = '{} seconds'.format(windowSize)
+slideArg = '{} seconds'.format(slideSize)
+
+
+spark = SparkSession\
+.builder\
+.appName("StructuredNetworkWordCountWindowed")\
+.getOrCreate()
+
+# Create DataFrame representing the stream of input lines from 
connection to host:port
+lines = spark\
+.readStream\
+.format('socket')\
+.option('host', host)\
+.option('port', port)\
+.option('includeTimestamp', 'true')\
+.load()
+
+# Split the lines into words, retaining timestamps
+words = lines.select(
+# explode turns each item in an array into a separate row
--- End diff --

move this above with the other comment. and bit more explanation. 
e.g.
"split() splits each line into an array, and explode() turns the array into 
multiple rows"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69362732
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import scala.Tuple2;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: JavaStructuredNetworkWordCountWindowed   
+ *   
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap. If
+ *  is not provided, it defaults to .
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example 
sql.streaming.JavaStructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is 60, 30
+ */
+public final class JavaStructuredNetworkWordCountWindowed {
+
+  public static void main(String[] args) throws Exception {
+if (args.length < 3) {
+  System.err.println("Usage: JavaStructuredNetworkWordCountWindowed 
 " +
+"  ");
+  System.exit(1);
+}
+
+String host = args[0];
+int port = Integer.parseInt(args[1]);
+int windowSize = Integer.parseInt(args[2]);
+int slideSize = (args.length == 3) ? windowSize : 
Integer.parseInt(args[3]);
+if (slideSize > windowSize) {
+  System.err.println(" must be less than or equal to 
");
+}
+String windowArg = windowSize + " seconds";
+String slideArg = slideSize + " seconds";
+
+SparkSession spark = SparkSession
+  .builder()
+  .appName("JavaStructuredNetworkWordCountWindowed")
+  .getOrCreate();
+
+// Create DataFrame representing the stream of input lines from 
connection to host:port
+Dataset> lines = spark
+  .readStream()
+  .format("socket")
+  .option("host", host)
+  .option("port", port)
+  .option("includeTimestamp", true)
+  .load().as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP()));
+
+// Split the lines into words, retaining timestamps
+Dataset words = lines.flatMap(
+  new FlatMapFunction, Tuple2>() {
+@Override
+public Iterator> call(Tuple2 t) {
+  List> result = new ArrayList<>();
+  for (String word : t._1.split(" ")) {
+result.add(new Tuple2<>(word, t._2));
+  }
+  return result.iterator();
+}
+  },
+  Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())
+).toDF("word", "timestamp");
+
+// Group the data by window and word and compute the count of each 
group
+Dataset windowedCounts = words.groupBy(
+  functions.window(words.col("timestamp"), windowArg, slideArg),
--- End diff --

windowArg --> windowDuration

[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69362577
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import scala.Tuple2;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: JavaStructuredNetworkWordCountWindowed   
+ *   
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap. If
+ *  is not provided, it defaults to .
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example 
sql.streaming.JavaStructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is 60, 30
--- End diff --

nit: make it 10 and 5, so that user does not have to wait for 60 seconds to 
see multiple windows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69362419
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import scala.Tuple2;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: JavaStructuredNetworkWordCountWindowed   
+ *   
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap. If
+ *  is not provided, it defaults to .
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example 
sql.streaming.JavaStructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is 60, 30
+ */
+public final class JavaStructuredNetworkWordCountWindowed {
+
+  public static void main(String[] args) throws Exception {
+if (args.length < 3) {
+  System.err.println("Usage: JavaStructuredNetworkWordCountWindowed 
 " +
+"  ");
--- End diff --

optional stuff is usually written as  `[ ... ]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69362387
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.examples.sql.streaming;
+
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import scala.Tuple2;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: JavaStructuredNetworkWordCountWindowed   
+ *   
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap. If
+ *  is not provided, it defaults to .
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example 
sql.streaming.JavaStructuredNetworkWordCountWindowed
+ *localhost   `
--- End diff --

window duration in seconds...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread jjthomas
Github user jjthomas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69358134
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -136,7 +149,8 @@ class TextSocketSourceProvider extends 
StreamSourceProvider with DataSourceRegis
   parameters: Map[String, String]): Source = {
 val host = parameters("host")
 val port = parameters("port").toInt
-new TextSocketSource(host, port, sqlContext)
+new TextSocketSource(host, port,
+  parameters.getOrElse("includeTimestamp", "false").toBoolean, 
sqlContext)
--- End diff --

sourceSchema and createSource are separate functions that are passed the 
parameters map, so the conversion to Boolean needs to happen in both places. 
Think it's fine if this throws an error if it's not parseable ... the line 
above will also throw an error if "port" is not parseable as Int.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69353150
  
--- Diff: 
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py 
---
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ sliding window of configurable duration. Each line from the network is 
tagged
+ with a timestamp that is used to determine the windows into which it 
falls.
+
+ Usage: structured_network_wordcount_windowed.py   
+   
+  and  describe the TCP server that Structured Streaming
+ would connect to receive data.
+  gives the size of window, specified as integer number 
of seconds
+  gives the amount of time successive windows are offset 
from one another,
+ given in the same units as above.  should be less than or 
equal to
+ . If the two are equal, successive windows have no 
overlap.
+
+ To run this on your local machine, you need to first run a Netcat server
+`$ nc -lk `
+ and then run the example
+`$ bin/spark-submit
+
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+localhost   `
+
+ One recommended ,  pair is 60, 30
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+if len(sys.argv) != 5:
+msg = ("Usage: structured_network_wordcount_windowed.py  
 "
+   " ")
--- End diff --

can you make the slide duration optional in all of these examples?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69353077
  
--- Diff: 
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py 
---
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ sliding window of configurable duration. Each line from the network is 
tagged
+ with a timestamp that is used to determine the windows into which it 
falls.
+
+ Usage: structured_network_wordcount_windowed.py   
+   
+  and  describe the TCP server that Structured Streaming
+ would connect to receive data.
+  gives the size of window, specified as integer number 
of seconds
+  gives the amount of time successive windows are offset 
from one another,
+ given in the same units as above.  should be less than or 
equal to
+ . If the two are equal, successive windows have no 
overlap.
+
+ To run this on your local machine, you need to first run a Netcat server
+`$ nc -lk `
+ and then run the example
+`$ bin/spark-submit
+
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+localhost   `
+
+ One recommended ,  pair is 60, 30
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+if len(sys.argv) != 5:
+msg = ("Usage: structured_network_wordcount_windowed.py  
 "
+   " ")
+print(msg, file=sys.stderr)
+exit(-1)
+
+host = sys.argv[1]
+port = int(sys.argv[2])
+windowSize = int(sys.argv[3])
+slideSize = int(sys.argv[4])
+if slideSize > windowSize:
+print(" must be less than or equal to ", file=sys.stderr)
+
+spark = SparkSession\
+.builder\
+.appName("StructuredNetworkWordCountWindowed")\
+.getOrCreate()
+
+# Create DataFrame representing the stream of input lines from 
connection to host:port
+lines = spark\
+.readStream\
+.format('socket')\
+.option('host', host)\
+.option('port', port)\
+.option('includeTimestamp', 'true')\
+.load()
+
+# Split the lines into words, retaining timestamps
+words = lines.select(
+# explode turns each item in an array into a separate row
+explode(
+split(lines.value, ' ')
+).alias('word'),
+lines.timestamp
+)
+
+# Group the data by window and word and compute the count of each group
+windowedCounts = words.groupBy(
+window(words.timestamp, '{} seconds'.format(windowSize),
--- End diff --

move the string generation earlier, around line 58. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69353021
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is 60, 30
+ */
+object StructuredNetworkWordCountWindowed {
+
+  def main(args: Array[String]) {
+if (args.length < 4) {
+  System.err.println("Usage: StructuredNetworkWordCountWindowed 
 " +
+"  ")
+  System.exit(1)
+}
+
+val host = args(0)
+val port = args(1).toInt
+val windowSize = args(2).toInt
+val slideSize = args(3).toInt
+if (slideSize > windowSize) {
+  System.err.println(" must be less than or equal to 
")
+}
+
+val spark = SparkSession
+  .builder
+  .appName("StructuredNetworkWordCountWindowed")
+  .getOrCreate()
+
+import spark.implicits._
+
+// Create DataFrame representing the stream of input lines from 
connection to host:port
+val lines = spark.readStream
+  .format("socket")
+  .option("host", host)
+  .option("port", port)
+  .option("includeTimestamp", true)
+  .load().as[(String, Timestamp)]
+
+// Split the lines into words, retaining timestamps
+val words = lines.flatMap(line =>
+  line._1.split(" ").map(word => (word, line._2))
+).toDF("word", "timestamp")
+
+// Group the data by window and word and compute the count of each 
group
+val windowedCounts = words.groupBy(
+  window(words.col("timestamp"), s"$windowSize seconds", s"$slideSize 
seconds"),
--- End diff --

Also move the "$windowSize seconds" higher up .. similar to the python, so 
that this piece of code looks simpler, and can be exactly copied over to the 
guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69352894
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap.
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is 60, 30
+ */
+object StructuredNetworkWordCountWindowed {
+
+  def main(args: Array[String]) {
+if (args.length < 4) {
+  System.err.println("Usage: StructuredNetworkWordCountWindowed 
 " +
+"  ")
+  System.exit(1)
+}
+
+val host = args(0)
+val port = args(1).toInt
+val windowSize = args(2).toInt
+val slideSize = args(3).toInt
+if (slideSize > windowSize) {
+  System.err.println(" must be less than or equal to 
")
+}
+
+val spark = SparkSession
+  .builder
+  .appName("StructuredNetworkWordCountWindowed")
+  .getOrCreate()
+
+import spark.implicits._
+
+// Create DataFrame representing the stream of input lines from 
connection to host:port
+val lines = spark.readStream
+  .format("socket")
+  .option("host", host)
+  .option("port", port)
+  .option("includeTimestamp", true)
+  .load().as[(String, Timestamp)]
+
+// Split the lines into words, retaining timestamps
+val words = lines.flatMap(line =>
+  line._1.split(" ").map(word => (word, line._2))
+).toDF("word", "timestamp")
+
+// Group the data by window and word and compute the count of each 
group
+val windowedCounts = words.groupBy(
+  window(words.col("timestamp"), s"$windowSize seconds", s"$slideSize 
seconds"),
--- End diff --

try the $ notation. update other example if it works. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69352799
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -47,7 +52,7 @@ class TextSocketSource(host: String, port: Int, 
sqlContext: SQLContext)
   private var readThread: Thread = null
 
   @GuardedBy("this")
-  private var lines = new ArrayBuffer[String]
+  private var lines = new ArrayBuffer[(String, String)]
--- End diff --

why cant you store it in the java.sql.time form?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69352584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -136,7 +149,8 @@ class TextSocketSourceProvider extends 
StreamSourceProvider with DataSourceRegis
   parameters: Map[String, String]): Source = {
 val host = parameters("host")
 val port = parameters("port").toInt
-new TextSocketSource(host, port, sqlContext)
+new TextSocketSource(host, port,
+  parameters.getOrElse("includeTimestamp", "false").toBoolean, 
sqlContext)
--- End diff --

will throw error if includeTimestamp is not parseable as boolean. 
also why is this condition check in two places? there should be one place 
checking the option and handling parsing errors and all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69352364
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -125,7 +137,8 @@ class TextSocketSourceProvider extends 
StreamSourceProvider with DataSourceRegis
 if (!parameters.contains("port")) {
   throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
 }
-("textSocket", TextSocketSource.SCHEMA)
+("textSocket", if (parameters.getOrElse("includeTimestamp", "false") 
== "true")
--- End diff --

not checking for case sensitivity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69352313
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -125,7 +137,8 @@ class TextSocketSourceProvider extends 
StreamSourceProvider with DataSourceRegis
 if (!parameters.contains("port")) {
   throw new AnalysisException("Set a port to read from with 
option(\"port\", ...).")
 }
-("textSocket", TextSocketSource.SCHEMA)
+("textSocket", if (parameters.getOrElse("includeTimestamp", "false") 
== "true")
+  { TextSocketSource.SCHEMA_TIMESTAMP } else { 
TextSocketSource.SCHEMA_REGULAR })
--- End diff --

this is hard to read. break into two lines 
```
val schema =  ...
("textSocket", schema"
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69351824
  
--- Diff: 
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py 
---
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+ Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ sliding window of configurable duration. Each line from the network is 
tagged
+ with a timestamp that is used to determine the windows into which it 
falls.
+
+ Usage: structured_network_wordcount_windowed.py   
+   
+  and  describe the TCP server that Structured Streaming
+ would connect to receive data.
+  gives the size of window, specified as integer number 
of seconds
+  gives the amount of time successive windows are offset 
from one another,
+ given in the same units as above.  should be less than or 
equal to
+ . If the two are equal, successive windows have no 
overlap.
+
+ To run this on your local machine, you need to first run a Netcat server
+`$ nc -lk `
+ and then run the example
+`$ bin/spark-submit
+
examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py
+localhost   `
+
+ One recommended ,  pair is 60, 30
+"""
+from __future__ import print_function
+
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+
+if __name__ == "__main__":
+if len(sys.argv) != 5:
+msg = ("Usage: structured_network_wordcount_windowed.py  
 "
+   " ")
+print(msg, file=sys.stderr)
+exit(-1)
+
+host = sys.argv[1]
+port = int(sys.argv[2])
+windowSize = int(sys.argv[3])
+slideSize = int(sys.argv[4])
+if slideSize > windowSize:
+print(" must be less than or equal to ", file=sys.stderr)
+
+spark = SparkSession\
+.builder\
+.appName("StructuredNetworkWordCountWindowed")\
+.getOrCreate()
+
+# Create DataFrame representing the stream of input lines from 
connection to host:port
+lines = spark\
+.readStream\
+.format('socket')\
+.option('host', host)\
+.option('port', port)\
+.option('includeTimestamp', 'true')\
+.load()
+
+# Split the lines into words, retaining timestamps
+words = lines.select(
+# explode turns each item in an array into a separate row
+explode(
+split(lines.value, ' ')
--- End diff --

looks too nested, put explode and split in the same line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69263155
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds, minutes,
+ * or days, e.g. "1 minute", "2 seconds"
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap.
+ * ( and  must be enclosed by quotes to 
ensure that
+ * they are processed as individual arguments)
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is "1 minute",
+ * "30 seconds"
+ */
+object StructuredNetworkWordCountWindowed {
+
+  def main(args: Array[String]) {
+if (args.length < 4) {
+  System.err.println("Usage: StructuredNetworkWordCountWindowed 
 " +
+"  ")
--- End diff --

on second thought might be simpler to just use seconds. less prone to users 
entering incorrect format.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69262951
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds, minutes,
+ * or days, e.g. "1 minute", "2 seconds"
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap.
+ * ( and  must be enclosed by quotes to 
ensure that
+ * they are processed as individual arguments)
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is "1 minute",
+ * "30 seconds"
+ */
+object StructuredNetworkWordCountWindowed {
+
+  def main(args: Array[String]) {
+if (args.length < 4) {
+  System.err.println("Usage: StructuredNetworkWordCountWindowed 
 " +
+"  ")
+  System.exit(1)
+}
+
+val host = args(0)
+val port = args(1).toInt
+val windowSize = args(2)
+val slideSize = args(3)
+
+val spark = SparkSession
+  .builder
+  .appName("StructuredNetworkWordCountWindowed")
+  .getOrCreate()
+
+import spark.implicits._
+
+// Create DataFrame representing the stream of input lines from 
connection to host:port
+val lines = spark.readStream
+  .format("socket")
+  .option("host", host)
+  .option("port", port)
+  .option("includeTimestamp", true)
+  .load().as[(String, Timestamp)]
+
+// Split the lines into words, retaining timestamps
+val words = lines.flatMap(line =>
+  line._1.split(" ")
+.map(word => (word, line._2))
+)
--- End diff --

you could convert this to a DF with nice column name with `.toDF("word",  
"time")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69262396
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds, minutes,
+ * or days, e.g. "1 minute", "2 seconds"
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap.
+ * ( and  must be enclosed by quotes to 
ensure that
+ * they are processed as individual arguments)
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is "1 minute",
+ * "30 seconds"
+ */
+object StructuredNetworkWordCountWindowed {
+
+  def main(args: Array[String]) {
+if (args.length < 4) {
+  System.err.println("Usage: StructuredNetworkWordCountWindowed 
 " +
+"  ")
+  System.exit(1)
+}
+
+val host = args(0)
+val port = args(1).toInt
+val windowSize = args(2)
+val slideSize = args(3)
+
+val spark = SparkSession
+  .builder
+  .appName("StructuredNetworkWordCountWindowed")
+  .getOrCreate()
+
+import spark.implicits._
+
+// Create DataFrame representing the stream of input lines from 
connection to host:port
+val lines = spark.readStream
+  .format("socket")
+  .option("host", host)
+  .option("port", port)
+  .option("includeTimestamp", true)
+  .load().as[(String, Timestamp)]
+
+// Split the lines into words, retaining timestamps
+val words = lines.flatMap(line =>
+  line._1.split(" ")
--- End diff --

put these two in the same line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-07-01 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69254704
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the 
network over a
+ * sliding window of configurable duration. Each line from the network is 
tagged
+ * with a timestamp that is used to determine the windows into which it 
falls.
+ *
+ * Usage: StructuredNetworkWordCountWindowed
+ *  and  describe the TCP server that Structured Streaming
+ * would connect to receive data.
+ *  gives the size of window, specified as integer number 
of seconds, minutes,
+ * or days, e.g. "1 minute", "2 seconds"
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above.  should be less than 
or equal to
+ * . If the two are equal, successive windows have no 
overlap.
+ * ( and  must be enclosed by quotes to 
ensure that
+ * they are processed as individual arguments)
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed
+ *localhost   `
+ *
+ * One recommended ,  pair is "1 minute",
+ * "30 seconds"
+ */
+object StructuredNetworkWordCountWindowed {
+
+  def main(args: Array[String]) {
+if (args.length < 4) {
+  System.err.println("Usage: StructuredNetworkWordCountWindowed 
 " +
+"  ")
--- End diff --

not clear what is the format of specifying the durations. Either add 
examples in the usage, or make sure they are in seconds and the user has to 
specify a single number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-06-30 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69248961
  
--- Diff: 
examples/src/main/python/sql/streaming/structured_network_wordcount.py ---
@@ -16,7 +16,7 @@
 #
 
 """
- Counts words in UTF8 encoded, '\n' delimited text received from the 
network every second.
+ Counts words in UTF8 encoded, '\n' delimited text received from the 
network.
--- End diff --

good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-06-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69053315
  
--- Diff: 
examples/src/main/scala/org/apache/spark/examples/sql/streaming/EventTimeWindow.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples.sql.streaming
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.types.{DoubleType, TimestampType}
+
+/**
+ * Computes the average signal from IoT device readings over a sliding 
window of
+ * configurable duration. The readings are received over the network and 
must be
+ * UTF8-encoded and separated by '\n'.
+ *
+ * A single reading should take the format
+ * , 
+ *
+ * Usage: EventTimeWindow   
+ *   
+ *  and  describe the TCP server that Structured Streaming 
would connect to
+ * receive data.
+ *  gives the size of window, specified as integer number 
of seconds, minutes,
+ * or days, e.g. "1 minute", "2 seconds"
+ *  gives the amount of time successive windows are offset 
from one another,
+ * given in the same units as above
+ * ( and  must be enclosed by quotes to 
ensure that
+ * they are processed as individual arguments)
+ *
+ * To run this on your local machine, you need to first run a Netcat server
+ *`$ nc -lk `
+ * and then run the example
+ *`$ bin/run-example sql.streaming.EventTimeWindow
+ *localhost   `
+ *
+ * Type device readings in the format given above into Netcat.
+ *
+ * An example sequence of device readings:
+ * dev0,7.0
+ * dev1,8.0
+ * dev0,5.0
+ * dev1,3.0
+ */
+object EventTimeWindow {
+
+  def main(args: Array[String]) {
+if (args.length < 4) {
+  System.err.println("Usage: EventTimeWindow   " +
+" ")
+  System.exit(1)
+}
+
+val host = args(0)
+val port = args(1).toInt
+val windowSize = args(2)
+val slideSize = args(3)
+
+val spark = SparkSession
+  .builder
+  .appName("EventTimeWindow")
+  .getOrCreate()
+
+// Create DataFrame representing the stream of input readings from 
connection to host:port
+val lines = spark.readStream
+  .format("socket")
+  .option("host", host)
+  .option("port", port)
+  .option("includeTimestamp", true)
+  .load()
+
+// Split the readings into their individual components
+val splitLines = lines.select(
+  split(lines.col("value"), ",").alias("pieces"),
--- End diff --

as i said offline this is very complex example. rather just do streaming 
windowed word count. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-06-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69052686
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -92,7 +99,12 @@ class TextSocketSource(host: String, port: Int, 
sqlContext: SQLContext)
 val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
 val data = synchronized { lines.slice(startIdx, endIdx) }
 import sqlContext.implicits._
-data.toDF("value")
--- End diff --

Add unit tests to test these cases in TextSocketStreamSuite


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13957: [SPARK-16114] [SQL] structured streaming event ti...

2016-06-29 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/13957#discussion_r69052427
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala 
---
@@ -67,7 +71,9 @@ class TextSocketSource(host: String, port: Int, 
sqlContext: SQLContext)
   return
 }
 TextSocketSource.this.synchronized {
-  lines += line
+  lines += ((line,
+new SimpleDateFormat("-MM-dd HH:mm:ss")
--- End diff --

no need to create a format object every time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org