[GitHub] spark pull request #17444: [SPARK-19876][SS] Follow up: Refactored BatchComm...

2017-03-27 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/17444#discussion_r108311230
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
 ---
@@ -45,33 +45,39 @@ import org.apache.spark.sql.SparkSession
 class BatchCommitLog(sparkSession: SparkSession, path: String)
   extends HDFSMetadataLog[String](sparkSession, path) {
 
+  import BatchCommitLog._
+
+  def add(batchId: Long): Unit = {
+super.add(batchId, EMPTY_JSON)
+  }
+
+  override def add(batchId: Long, metadata: String): Boolean = {
+throw new UnsupportedOperationException(
--- End diff --

What if we want metadata down the road? What's the problem with supporting 
this for cases other than null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17219: [SPARK-19876][SS][WIP] OneTime Trigger Executor

2017-03-20 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/17219#discussion_r107045633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala ---
@@ -38,6 +38,51 @@ sealed trait Trigger
 
 /**
  * :: Experimental ::
+ * A trigger that runs a query once then terminates
+ *
+ * Scala Example:
+ * {{{
+ *   df.write.trigger(OneTime)
+ * }}}
+ *
+ * Java Example:
+ * {{{
+ *   df.write.trigger(OneTime.create())
+ * }}}
+ *
+ * @since 2.2.0
+ */
+@Experimental
+@InterfaceStability.Evolving
+case class OneTime() extends Trigger
+
+/**
+ * :: Experimental ::
+ * Used to create [[OneTime]] triggers for [[StreamingQuery]]s.
--- End diff --

The explanation of OneTime trigger is given in the OneTime class 
definition. Does that suffice, or should the explanation be reiterated in the 
object definition?

If it should be reiterated, then I can do the same with ProcessingTime.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

2017-03-10 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/17246

[SPARK-19906][SS][DOCS] Documentation describing how to write queries to 
Kafka

## What changes were proposed in this pull request?

Add documentation that describes how to write streaming and batch queries 
to Kafka.

@zsxwing @tdas 

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark kafka-write-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17246.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17246


commit 172d4505e5583c541e4644b1eeb12f853bf638cd
Author: Tyson Condie 
Date:   2017-03-10T19:14:33Z

update




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17231: [SPARK-19891][SS] Await Batch Lock notified on st...

2017-03-09 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/17231

[SPARK-19891][SS] Await Batch Lock notified on stream execution exit

## What changes were proposed in this pull request?

We need to notify the await batch lock when the stream exits early e.g., 
when an exception has been thrown. 

## How was this patch tested?

Current tests that throw exceptions at runtime will finish faster as a 
result of this update.

@zsxwing 

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark kafka-writer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17231.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17231


commit d371758538f659cbcf604e591110665cfca4f216
Author: Tyson Condie 
Date:   2017-01-20T01:53:03Z

add kafka relation and refactor kafka source

commit b6c3055f2c21050cb9e203213651e02779d724bc
Author: Tyson Condie 
Date:   2017-01-20T02:29:27Z

update

commit 4c81812e93907219af69dd57e0f4f5ebaf92b262
Author: Tyson Condie 
Date:   2017-01-20T18:24:07Z

update

commit ab02a4c631f9fc0ecd8528d85c61fe3c5de64040
Author: Tyson Condie 
Date:   2017-01-20T19:53:24Z

single kafka provider for both stream and batch

commit e6b57edb0958649062749cb9a0f7cda74f0b2829
Author: Tyson Condie 
Date:   2017-01-24T00:32:56Z

added uninterruptible thread version of kafka offset reader

commit ff94ed803474448f6bb388f8933e6ec091fc24a1
Author: Tyson Condie 
Date:   2017-01-24T00:44:31Z

added uninterruptible thread version of kafka offset reader

commit f8fd34cf0c2da4f1b9c793e4e021c23a923a0285
Author: Tyson Condie 
Date:   2017-01-24T01:13:16Z

update tests

commit 41271e25896387262c2c3a5b4fad6ba48feb9121
Author: Tyson Condie 
Date:   2017-01-24T18:32:26Z

resolve conflicts in KafakSource

commit 74d96fc9049a0a0fb6de6d011eb896b7d7c32b30
Author: Tyson Condie 
Date:   2017-01-24T18:45:12Z

update comments

commit d31fc8104bf94c82bc6fa1c099def9ca16fec93a
Author: Tyson Condie 
Date:   2017-01-25T00:53:11Z

address comments from @zsxwing

commit 1db1649201361bcede52997ec8c2f0610a55da8b
Author: Tyson Condie 
Date:   2017-01-25T01:04:06Z

update

commit 3b0d48b53c1b7a992557c6a42910a4681a84865e
Author: Tyson Condie 
Date:   2017-01-25T18:05:55Z

Merge branch 'master' of https://github.com/apache/spark into SPARK-18682

commit a5b02691ddbaef6c8092881ea15b582d9137be71
Author: Tyson Condie 
Date:   2017-01-26T21:32:00Z

address comments from @zsxwing

commit c08c01fd21fc53d3db3504a257ab6bb6115cd462
Author: Tyson Condie 
Date:   2017-01-27T16:33:29Z

late binding offsets

commit 79d335e697ba5af3f02f524d39c16e61d2cc73d9
Author: Tyson Condie 
Date:   2017-01-27T18:44:50Z

update to late binding logic

commit a44b365bba4bcdb7cd56d16c31d0447de9ed5aa3
Author: Tyson Condie 
Date:   2017-01-27T19:00:50Z

Merge branch 'SPARK-18682' into kafka-writer

commit 51291e36fad5f3511ce1d3afc17e2e714dffe2d3
Author: Tyson Condie 
Date:   2017-01-27T20:21:36Z

remove kafka log4j debug

commit b597cf1bf2135659de1e25bb7d41aeeac26f87f9
Author: Tyson Condie 
Date:   2017-01-27T20:22:33Z

remove kafka log4j debug

commit 84b32c55787290415016186d0c53543661b12851
Author: Tyson Condie 
Date:   2017-01-30T21:12:48Z

Merge branch 'SPARK-18682' into kafka-writer

commit f5ae3012ec37be0c4a6208ba00438592cd3aa791
Author: Tyson Condie 
Date:   2017-01-31T20:17:18Z

update

commit 2487a7260ec11496aed0135a723e156376c6ff31
Author: Tyson Condie 
Date:   2017-01-31T22:29:24Z

address comments from @zsxwing

commit 789d3afc0163b93aa0f859354221ea7e2374f74a
Author: Tyson Condie 
Date:   2017-01-31T23:12:43Z

update

commit 56a06e7dc75a72148fca498b41d7adf7c41e4cfe
Author: Tyson Condie 
Date:   2017-02-01T22:02:18Z

Merge branch 'SPARK-18682' into kafka-writer

commit e74473b28b160633ab6545fae208981096fe367c
Author: Tyson Condie 
Date:   2017-02-02T00:58:01Z

update

commit 73df054e87eb755bc0b25ee15e7fdd501cd0c10c
Author: Tyson Condie 
Date:   2017-02-02T22:43:39Z

update

commit 5b48fc65ac08e8ed4a09edd0d346990d40d042e0
Author: Tyson Condie 
Date:   2017-02-03T01:20:03Z

address comments from @tdas

commit 57760094591439525d3a53d0c5845a0da7e9b8eb
Author: Tyson Condie 
Date:   2017-02-03T19:46:00Z

address feedback from @tdas and @sxwing

commit 63d453f6078897d4049640d009aa0722b1f93908
Author: Tyson Condie 
Date:   2017-02-03T20:00:13Z

update merge

commit 3c4eecf3cd529c45c7b7a1bf053c7ea92e4caa50
Author: Tyson Condie 
Date:   2017-02-03T23:16:03Z

update

commit b0611e48f71be67e1481cb0754e7f49f06f73dc2
Author: Tyson Condie 
Date:   2017-02

[GitHub] spark pull request #17219: [SPARK-19876][SS][WIP] OneTime Trigger Executor

2017-03-08 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/17219

[SPARK-19876][SS][WIP] OneTime Trigger Executor

## What changes were proposed in this pull request?

An additional trigger and trigger executor that will execute a single 
trigger only. One can use this OneTime trigger to have more control over the 
scheduling of triggers. 

In addition, this patch requires an optimization to StreamExecution that 
logs a commit record at the end of successfully processing a batch. This new 
commit log will be used to determine the next batch (offsets) to process after 
a restart, instead of using the offset log itself to determine what batch to 
process next after restart; using the offset log to determine this would 
process the previously logged batch, always, thus not permitting a OneTime 
trigger feature.  

## How was this patch tested?

A number of existing tests have been revised. These tests all assumed that 
when restarting a stream, the last batch in the offset log is to be 
re-processed. Given that we now have a commit log that will tell us if that 
last batch was processed successfully, the results/assumptions of those tests 
needed to be revised accordingly. 

In addition, a OneTime trigger test was added to StreamingQuerySuite, which 
tests:
- The semantics of OneTime trigger (i.e., on start, execute a single batch, 
then stop).
- The case when the commit log was not able to successfully log the 
completion of a batch before restart, which would mean that we should fall back 
to what's in the offset log.
- A OneTime trigger execution that results in an exception being thrown.

@marmbrus @tdas @zsxwing 

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark stream-commit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17219


commit 08a36f8c5a833da1deb5db99dc620ba4e98d67a1
Author: Tyson Condie 
Date:   2017-03-06T19:05:58Z

update

commit 0e21d0e829a134d050b1c881745dbcfca8986378
Author: Tyson Condie 
Date:   2017-03-07T00:46:30Z

Merge branch 'master' of https://github.com/apache/spark into stream-commit

commit 9b8abb445725828e73e392ee58ffa844c2506ca3
Author: Tyson Condie 
Date:   2017-03-08T18:55:57Z

update existing tests

commit 682eb1a3987c0f481de4bebf72926f14816d7607
Author: Tyson Condie 
Date:   2017-03-09T00:10:06Z

add onetime trigger test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17043: [SPARK-19719][SS] Kafka writer for both structure...

2017-02-23 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/17043

[SPARK-19719][SS] Kafka writer for both structured streaming and batch 
queires

## What changes were proposed in this pull request?

Add a new Kafka Sink and Kafka Relation for writing streaming and batch 
queries, respectively, to Apache Kafka. 

@tdas @zsxwing 

## How was this patch tested?

KafkaSinkSuite.scala contains tests for writing both streaming and batch 
queries to Kafka.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark kafka-writer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17043.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17043


commit d371758538f659cbcf604e591110665cfca4f216
Author: Tyson Condie 
Date:   2017-01-20T01:53:03Z

add kafka relation and refactor kafka source

commit b6c3055f2c21050cb9e203213651e02779d724bc
Author: Tyson Condie 
Date:   2017-01-20T02:29:27Z

update

commit 4c81812e93907219af69dd57e0f4f5ebaf92b262
Author: Tyson Condie 
Date:   2017-01-20T18:24:07Z

update

commit ab02a4c631f9fc0ecd8528d85c61fe3c5de64040
Author: Tyson Condie 
Date:   2017-01-20T19:53:24Z

single kafka provider for both stream and batch

commit e6b57edb0958649062749cb9a0f7cda74f0b2829
Author: Tyson Condie 
Date:   2017-01-24T00:32:56Z

added uninterruptible thread version of kafka offset reader

commit ff94ed803474448f6bb388f8933e6ec091fc24a1
Author: Tyson Condie 
Date:   2017-01-24T00:44:31Z

added uninterruptible thread version of kafka offset reader

commit f8fd34cf0c2da4f1b9c793e4e021c23a923a0285
Author: Tyson Condie 
Date:   2017-01-24T01:13:16Z

update tests

commit 41271e25896387262c2c3a5b4fad6ba48feb9121
Author: Tyson Condie 
Date:   2017-01-24T18:32:26Z

resolve conflicts in KafakSource

commit 74d96fc9049a0a0fb6de6d011eb896b7d7c32b30
Author: Tyson Condie 
Date:   2017-01-24T18:45:12Z

update comments

commit d31fc8104bf94c82bc6fa1c099def9ca16fec93a
Author: Tyson Condie 
Date:   2017-01-25T00:53:11Z

address comments from @zsxwing

commit 1db1649201361bcede52997ec8c2f0610a55da8b
Author: Tyson Condie 
Date:   2017-01-25T01:04:06Z

update

commit 3b0d48b53c1b7a992557c6a42910a4681a84865e
Author: Tyson Condie 
Date:   2017-01-25T18:05:55Z

Merge branch 'master' of https://github.com/apache/spark into SPARK-18682

commit a5b02691ddbaef6c8092881ea15b582d9137be71
Author: Tyson Condie 
Date:   2017-01-26T21:32:00Z

address comments from @zsxwing

commit c08c01fd21fc53d3db3504a257ab6bb6115cd462
Author: Tyson Condie 
Date:   2017-01-27T16:33:29Z

late binding offsets

commit 79d335e697ba5af3f02f524d39c16e61d2cc73d9
Author: Tyson Condie 
Date:   2017-01-27T18:44:50Z

update to late binding logic

commit a44b365bba4bcdb7cd56d16c31d0447de9ed5aa3
Author: Tyson Condie 
Date:   2017-01-27T19:00:50Z

Merge branch 'SPARK-18682' into kafka-writer

commit 51291e36fad5f3511ce1d3afc17e2e714dffe2d3
Author: Tyson Condie 
Date:   2017-01-27T20:21:36Z

remove kafka log4j debug

commit b597cf1bf2135659de1e25bb7d41aeeac26f87f9
Author: Tyson Condie 
Date:   2017-01-27T20:22:33Z

remove kafka log4j debug

commit 84b32c55787290415016186d0c53543661b12851
Author: Tyson Condie 
Date:   2017-01-30T21:12:48Z

Merge branch 'SPARK-18682' into kafka-writer

commit f5ae3012ec37be0c4a6208ba00438592cd3aa791
Author: Tyson Condie 
Date:   2017-01-31T20:17:18Z

update

commit 2487a7260ec11496aed0135a723e156376c6ff31
Author: Tyson Condie 
Date:   2017-01-31T22:29:24Z

address comments from @zsxwing

commit 789d3afc0163b93aa0f859354221ea7e2374f74a
Author: Tyson Condie 
Date:   2017-01-31T23:12:43Z

update

commit 56a06e7dc75a72148fca498b41d7adf7c41e4cfe
Author: Tyson Condie 
Date:   2017-02-01T22:02:18Z

Merge branch 'SPARK-18682' into kafka-writer

commit e74473b28b160633ab6545fae208981096fe367c
Author: Tyson Condie 
Date:   2017-02-02T00:58:01Z

update

commit 73df054e87eb755bc0b25ee15e7fdd501cd0c10c
Author: Tyson Condie 
Date:   2017-02-02T22:43:39Z

update

commit 5b48fc65ac08e8ed4a09edd0d346990d40d042e0
Author: Tyson Condie 
Date:   2017-02-03T01:20:03Z

address comments from @tdas

commit 57760094591439525d3a53d0c5845a0da7e9b8eb
Author: Tyson Condie 
Date:   2017-02-03T19:46:00Z

address feedback from @tdas and @sxwing

commit 63d453f6078897d4049640d009aa0722b1f93908
Author: Tyson Condie 
Date:   2017-02-03T20:00:13Z

update merge

commit 3c4eecf3cd529c45c7b7a1bf053c7ea92e4caa50
Author: Tyson Condie 
Date:   2017-02-03T23:16:03Z

update

commit b0611e48f71be67e1481cb0754e7f49f06f73dc2
Author: Ty

[GitHub] spark pull request #16918: [SPARK-19584] [SS] [DOCS] update structured strea...

2017-02-14 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16918#discussion_r101168119
  
--- Diff: docs/structured-streaming-kafka-integration.md ---
@@ -187,50 +306,68 @@ The following options must be set for the Kafka 
source.
 The following configurations are optional:
 
 
-Optionvaluedefaultmeaning
+Optionvaluedefaultquery 
typemeaning
 
   startingOffsets
-  earliest, latest, or json string
-  {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}}
+  "earliest", "latest" (streaming only), or json string
+  """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
   
-  latest
+  "latest" for streaming, "earliest" for batch
+  streaming and batch
   The start point when a query is started, either "earliest" which is 
from the earliest offsets,
   "latest" which is just from the latest offsets, or a json string 
specifying a starting offset for
   each TopicPartition.  In the json, -2 as an offset can be used to refer 
to earliest, -1 to latest.
-  Note: This only applies when a new Streaming query is started, and that 
resuming will always pick
-  up from where the query left off. Newly discovered partitions during a 
query will start at
+  Note: For Batch queries, latest (either implicitly or by using -1 in 
json) is not allowed.
--- End diff --

Agreed. The previous version had "S" caps, so I wasn't sure if this was a 
convention or not. I'll make the correction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16918: [SPARK-19584] [SS] [DOCS] update structured strea...

2017-02-13 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/16918

[SPARK-19584] [SS] [DOCS] update structured streaming documentation around 
batch mode

## What changes were proposed in this pull request?

Revision to structured-streaming-kafka-integration.md to reflect new Batch 
query specification and options. 

@zsxwing @tdas

## How was this patch tested?

N/A

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark kafka-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16918.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16918


commit debe111da3cf1981a4c666554eba669be248f9d4
Author: Tyson Condie 
Date:   2017-02-14T00:01:06Z

update structured streaming documentation around batch mode




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99382639
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaTopicPartitionOffsetReader.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ * The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ * by this source. These strategies directly correspond to the different 
consumption options
+ * in. This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ * [[KafkaSource]] to query for the offsets. See the docs on
+ * [[org.apache.spark.sql.kafka010.ConsumerStrategy]]
+ * for more details.
+ *
+ * Note: This class is not ThreadSafe
+ */
+private[kafka010] class KafkaTopicPartitionOffsetReader(
--- End diff --

I'll rollback. My thought was to also indicate that it's being used to read 
TopicPartition(s).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99381274
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -19,14 +19,31 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-/*
- * Values that can be specified for config startingOffsets
+/**
+ * Values that can be specified to configure starting,
+ * ending, and specific offsets.
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
--- End diff --

I went with KafkaOffsetRangeLimit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99244144
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -135,7 +136,28 @@ private[kafka010] class KafkaSourceRDD(
 } else {
   new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
 val consumer = CachedKafkaConsumer.getOrCreate(
-  range.topic, range.partition, executorKafkaParams)
+range.topic, range.partition, executorKafkaParams, 
reuseKafkaConsumer)
+if (range.fromOffset < 0 || range.untilOffset < 0) {
--- End diff --

Reworked it. Let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99240245
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -334,14 +334,15 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
   def getOrCreate(
   topic: String,
   partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
+  kafkaParams: ju.Map[String, Object],
+  reuse: Boolean): CachedKafkaConsumer = synchronized {
--- End diff --

reuse existing. I changed the name to reuseExistingIfPresent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-27 Thread tcondie
Github user tcondie commented on the issue:

https://github.com/apache/spark/pull/16686
  
jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-27 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98237559
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val reader = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+var df = reader.selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the 
reader
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
--- End diff --

This no longer holds now that we're binding in the executor, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97620911
  
--- Diff: 
external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 ---
@@ -1 +1 @@
-org.apache.spark.sql.kafka010.KafkaSourceProvider
+org.apache.spark.sql.kafka010.KafkaProvider
--- End diff --

That's true, but revised Provider not only provides a Source but also a 
Relation, hence the decision to rename to something more general. Not clear if 
this outweighs the risks you've pointed out. @tdas @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread tcondie
Github user tcondie commented on the issue:

https://github.com/apache/spark/pull/16686
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-23 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/16686

[SPARK-18682][SS] Batch Source for Kafka

## What changes were proposed in this pull request?

Today, you can start a stream that reads from kafka. However, given kafka's 
configurable retention period, it seems like sometimes you might just want to 
read all of the data that is available now. As such we should add a version 
that works with spark.read as well.
The options should be the same as the streaming kafka source, with the 
following differences:
startingOffsets should default to earliest, and should not allow latest 
(which would always be empty).
endingOffsets should also be allowed and should default to latest. the same 
assign json format as startingOffsets should also be accepted.
It would be really good, if things like .limit(n) were enough to prevent 
all the data from being read (this might just work).

## How was this patch tested?

KafkaRelationSuite was added for testing batch queries via KafkaUtils. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark SPARK-18682

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16686






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16219: [SPARK-18790][SS] Keep a general offset history o...

2016-12-09 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16219#discussion_r91805757
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
@@ -303,7 +303,6 @@ private[state] class HDFSBackedStateStoreProvider(
   val mapFromFile = readSnapshotFile(version).getOrElse {
 val prevMap = loadMap(version - 1)
 val newMap = new MapType(prevMap)
-newMap.putAll(prevMap)
--- End diff --

new MapType(prevMap) will make a call that is equivalent to 
newMap.putAll(prevMap). So basically, newMap.putAll(prevMap) is redundant work. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16219: [SPARK-18790][SS] Keep a general offset history o...

2016-12-08 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/16219

[SPARK-18790][SS] Keep a general offset history of stream batches

## What changes were proposed in this pull request?

Instead of only keeping the minimum number of offsets around, we should 
keep enough information to allow us to roll back n batches and reexecute the 
stream starting from a given point. In particular, we should create a config in 
SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure 
that we keep enough log files in the following places to roll back the 
specified number of batches:
the offsets that are present in each batch
versions of the state store
the files lists stored for the FileStreamSource
the metadata log stored by the FileStreamSink

@marmbrus @zsxwing 

## How was this patch tested?

The following tests were added.

### StreamExecution offset metadata
Test added to StreamingQuerySuite that ensures offset metadata is garbage 
collected according to minBatchesRetain

### CompactibleFileStreamLog
Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged 
starting before the first compaction file that proceeds the current batch id - 
minBatchesToRetain.  


Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark offset_hist

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16219.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16219


commit 2d96af3ad4fe3d07cd80c727d2527de1d0ba3c57
Author: Tyson Condie 
Date:   2016-12-08T19:02:33Z

revised log history maintenence based on minBatchesToRetain configuration 
parameter

commit fc1557eb178d070814776ffaa6c14a8cb48ea83a
Author: Tyson Condie 
Date:   2016-12-08T20:42:57Z

add test for metadata garbage collection based on minBatchesToRetain




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89873586
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
--- End diff --

I should also say that I'm not too concerned by the indeterministic system 
clock issues since the batchTimestamp is recorded prior to running the query. 
Therefore as long as the query gets planned within 10 seconds, we're good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...

2016-11-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89856411
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.io.File
+
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamExecutionMetadata}
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{SystemClock, Utils}
+
+class StreamExecutionMetadataSuite extends StreamTest {
+
+  private def newMetadataDir =
+Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
+
+  test("stream execution metadata") {
+assert(StreamExecutionMetadata(0, 0) ===
+  StreamExecutionMetadata("""{}"""))
+assert(StreamExecutionMetadata(1, 0) ===
+  StreamExecutionMetadata("""{"batchWatermarkMs":1}"""))
+assert(StreamExecutionMetadata(0, 2) ===
+  StreamExecutionMetadata("""{"batchTimestampMs":2}"""))
+assert(StreamExecutionMetadata(1, 2) ===
+  StreamExecutionMetadata(
+"""{"batchWatermarkMs":1,"batchTimestampMs":2}"""))
+  }
+
+  test("metadata is recovered from log when query is restarted") {
+import testImplicits._
+val clock = new SystemClock()
+val ms = new MemoryStream[Long](0, sqlContext)
+val df = ms.toDF().toDF("a")
+val checkpointLoc = newMetadataDir
+val checkpointDir = new File(checkpointLoc, "complete")
+checkpointDir.mkdirs()
+assert(checkpointDir.exists())
+val tableName = "test"
+// Query that prunes timestamps less than current_timestamp, making
+// it easy to use for ensuring that a batch is re-processed with the
+// timestamp used when it was first processed.
+def startQuery: StreamingQuery = {
+  df.groupBy("a")
+.count()
+.where('a >= current_timestamp().cast("long"))
+.writeStream
+.format("memory")
+.queryName(tableName)
+.option("checkpointLocation", checkpointLoc)
+.outputMode("complete")
+.start()
+}
+// no exception here
+val t1 = clock.getTimeMillis() + 60L * 1000L
+val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L
+val q = startQuery
+ms.addData(t1, t2)
+q.processAllAvailable()
+
+checkAnswer(
+  spark.table(tableName),
+  Seq(Row(t1, 1), Row(t2, 1))
+)
+
+q.stop()
+Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2
--- End diff --

@zsxwing I don't see an obvious way to pass a StreamManualClock in 
DataStreamWriter.start(). Should I be taking an entirely different approach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-22 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89208121
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression 
with CodegenFallback {
 }
 
 /**
+ * Expression representing the current batch time, which is used by 
StreamExecution to
+ * 1. prevent optimizer from pushing this expression below a stateful 
operator
+ * 2. allow IncrementalExecution to substitute this expression with a 
Literal(timestamp)
+ *
+ * There is no code generation since this expression should be replaced 
with a literal.
+ */
+case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends 
LeafExpression
--- End diff --

Both Nondeterministic and Unevaluable have final 'eval' methods, so both 
cannot be mixed in, and I need Nondeterministic to avoid optimizer pushdown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89022903
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -38,6 +40,26 @@ import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
 /**
+ * Contains metadata associated with a stream execution. This information 
is
+ * persisted to the offset log via the OffsetSeq metadata field. Current
+ * information contained in this object includes:
+ *
+ * 1. currentEventTimeWatermark: The current eventTime watermark, used to
+ * bound the lateness of data that will processed.
+ * 2. currentBatchTimestamp: The current batch processing timestamp
+ */
+case class StreamExecutionMetadata(
+var currentEventTimeWatermark: Long = 0,
+var currentBatchTimestamp: Long = 0) {
--- End diff --

Ryan indicated we use SQLTimestamp to indicate microseconds. I changed the 
name of the watermark to currentEventTimeWatermarkMillis based on your 
suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r89017673
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -288,7 +310,11 @@ class StreamExecution(
 logInfo(s"Resuming streaming query, starting with batch $batchId")
 currentBatchId = batchId
 availableOffsets = nextOffsets.toStreamProgress(sources)
-logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+streamExecutionMetadata = 
StreamExecutionMetadata(nextOffsets.metadata.getOrElse(
+  throw new IllegalStateException("OffsetLog does not contain 
current batch timestamp!")
--- End diff --

Adding {} doesn't yield code that compiles/typechecks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...

2016-11-21 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15949#discussion_r88905225
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -422,6 +432,7 @@ class StreamExecution(
 val replacementMap = AttributeMap(replacements)
 val triggerLogicalPlan = withNewSources transformAllExpressions {
   case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+  case t: CurrentTimestamp => 
CurrentBatchTimestamp(currentBatchTimestamp)
--- End diff --

Good question. The purpose of the dedicated expression is to avoid having 
it pushed down below a stateful operator (e.g., aggregation). during 
optimization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...

2016-11-20 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/15949

[Spark-18339] [SQL] Don't push down current_timestamp for filters in 
StructuredStreaming

## What changes were proposed in this pull request?

For the following workflow:
1. I have a column called time which is at minute level precision in a 
Streaming DataFrame
2. I want to perform groupBy time, count
3. Then I want my MemorySink to only have the last 30 minutes of counts and 
I perform this by
.where('time >= current_timestamp().cast("long") - 30 * 60)
what happens is that the `filter` gets pushed down before the aggregation, 
and the filter happens on the source data for the aggregation instead of the 
result of the aggregation (where I actually want to filter).
I guess the main issue here is that `current_timestamp` is 
non-deterministic in the streaming context and shouldn't be pushed down the 
filter.
Does this require us to store the `current_timestamp` for each trigger of 
the streaming job, that is something to discuss.

@brkyvz @zsxwing @tdas 

## How was this patch tested?

A test was added to StreamingAggregationSuite ensuring the above use case 
is handled. The test injects a stream of time values (in seconds) to a query 
that runs in complete mode and only outputs the (count) aggregation results for 
the past 10 seconds. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark SPARK-18339

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15949


commit 729ecedab4b61804a9717cadbd4f2c7b6aa50176
Author: Tyson Condie 
Date:   2016-11-18T23:26:25Z

added CurrentBatchTimestamp

commit b8a1f71bef2aa4c11c08178b2250bd995e952601
Author: Tyson Condie 
Date:   2016-11-18T23:35:27Z

update comment

commit 8f0a27329ea711d1936c2df11a310129e22eb9b5
Author: Tyson Condie 
Date:   2016-11-20T20:41:49Z

add test for filtering time-based aggregation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15924: Spark-18498 [SQL] Revise HDFSMetadataLog API for ...

2016-11-17 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/15924

Spark-18498 [SQL] Revise HDFSMetadataLog API for better testing

## What changes were proposed in this pull request?

Revise HDFSMetadataLog API such that metadata object serialization and 
final batch file write are separated. This will allow serialization checks 
without worrying about batch file name formats. 

## How was this patch tested?

Existing tests already ensure this API faithfully support core 
functionality i.e., creation of batch files.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark SPARK-18498

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15924.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15924


commit 8e3d7051673f30863a938e8b06299b8aec227886
Author: Tyson Condie 
Date:   2016-11-18T01:30:21Z

revise HDFSMetadataLog API for better testing




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15852: Spark-18187 [SQL] CompactibleFileStreamLog should...

2016-11-16 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15852#discussion_r88362276
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 ---
@@ -63,7 +63,60 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
 
   protected def isDeletingExpiredLog: Boolean
 
-  protected def compactInterval: Int
+  protected def defaultCompactInterval: Int
+
+  protected final lazy val compactInterval: Int = {
--- End diff --

FileStreamSourceLog uses compactInterval in multiple places. Please advise?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15852: Spark-18187 [SQL] CompactibleFileStreamLog should...

2016-11-16 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15852#discussion_r88359775
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 ---
@@ -63,7 +63,60 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
 
   protected def isDeletingExpiredLog: Boolean
 
-  protected def compactInterval: Int
+  protected def defaultCompactInterval: Int
+
+  protected final lazy val compactInterval: Int = {
+// SPARK-18187: "compactInterval" can be set by user via 
defaultCompactInterval.
+// If there are existing log entries, then we should ensure a 
compatible compactInterval
+// is used, irrespective of the defaultCompactInterval. There are 
three cases:
+//
+// 1. If there is no '.compact' file, we can use the default setting 
directly.
+// 2. If there are two or more '.compact' files, we use the interval 
of patch id suffix with
+// '.compact' as compactInterval. It is unclear whether this case will 
ever happen in the
+// current code, since only the latest '.compact' file is retained 
i.e., other are garbage
+// collected.
+// 3. If there is only one '.compact' file, then we must find a 
compact interval
+// that is compatible with (i.e., a divisor of) the previous compact 
file, and that
+// faithfully tries to represent the revised default compact interval 
i.e., is at least
+// is large if possible.
+// e.g., if defaultCompactInterval is 5 (and previous compact interval 
could have
+// been any 2,3,4,6,12), then a log could be: 11.compact, 12, 13, in 
which case
+// will ensure that the new compactInterval = 6 > 5 and (11 + 1) % 6 
== 0
+val compactibleBatchIds = fileManager.list(metadataPath, 
batchFilesFilter)
+  .filter(f => 
f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX))
+  .map(f => pathToBatchId(f.getPath))
+  .sorted
+  .reverse
+
+// Case 1
+var interval = defaultCompactInterval
+if (compactibleBatchIds.length >= 2) {
+  // Case 2
+  val latestCompactBatchId = compactibleBatchIds(0)
+  val previousCompactBatchId = compactibleBatchIds(1)
+  interval = (latestCompactBatchId - previousCompactBatchId).toInt
+  logInfo(s"Compact interval case 2 = $interval")
--- End diff --

This was debugging info that I was going to remove.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15852: Spark 18187

2016-11-11 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/15852

Spark 18187

## What changes were proposed in this pull request?
CompactibleFileStreamLog relys on "compactInterval" to detect a compaction 
batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will 
return wrong answer, resulting data loss. This PR procides a way to check the 
validity of 'compactInterval', and calculate an appropriate value.

## How was this patch tested?
When restart a stream, we change the 
'spark.sql.streaming.fileSource.log.compactInterval' different with the former 
one.

The primary solution to this issue was given by @uncleGen 
Added extensions include an additional metadata field in OffsetSeq and 
CompactibleFileStreamLog APIs. @zsxwing 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tcondie/spark spark-18187

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15852.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15852


commit 65395dddb505f6084db471430da1486d75a77e2a
Author: genmao.ygm 
Date:   2016-11-09T08:21:09Z

SPARK-18187: CompactibleFileStreamLog should not rely on "compactInterval" 
to detect a compaction batch

commit d556933e0f039d661989e07f381aff185c9fac1b
Author: genmao.ygm 
Date:   2016-11-09T08:24:53Z

comment update

commit 8b56f70b2dffd69dbc37007e923f3d5a56fce039
Author: genmao.ygm 
Date:   2016-11-09T08:34:11Z

revert

commit 4a7e28c4e372caa3b16b979273577bd6aa2c11f3
Author: genmao.ygm 
Date:   2016-11-09T08:35:13Z

unit test - compacat metadata log
change compactInterval from 4 to 5

commit 23e1baf454bde511ed1963a27f6492100823d496
Author: genmao.ygm 
Date:   2016-11-09T09:34:15Z

bug fix: /zero

commit 7d37e08026eaa1364e8a4fb10fb7cfb93cb51229
Author: Tyson Condie 
Date:   2016-11-11T00:50:02Z

Merge branch 'SPARK-18187' of https://github.com/uncleGen/spark into 
spark-18187

commit d3f7bbf32d0debba24853a38eb48bfcdcdb517be
Author: Tyson Condie 
Date:   2016-11-11T00:52:24Z

Merge branch 'master' of https://github.com/apache/spark into spark-18187

commit 6901eacdddf235db4ba91a0903ce8826978d778a
Author: Tyson Condie 
Date:   2016-11-11T18:16:41Z

extend offset seq to include metadata




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on the issue:

https://github.com/apache/spark/pull/15626
  
All log are now uniformly marshall entries to JSON. Each log has a 
particular way of organizing its JSON entries:
HDFSMetadataLog: 
  Each Batch ID is a separate file.
  All log entries within a batch id are serialized in one line.

OffsetSeqLog:
  Each Batch ID is a separate file. 
  All log entries (i.e., Offsets) are separated by a newline character 
i.e., one offset entry per line.

CompactibleFileStreamLog, FileStreamSinkLog, and FileStreamSourceLog
  No changes to log file format i.e., log entries are separated by a new 
line character. 
  Log entry -> JSON serialization is now done in CompactibleFileStreamLog 
directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85615504
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala 
---
@@ -23,4 +23,16 @@ package org.apache.spark.sql.execution.streaming
  * ordering of two [[Offset]] instances.  We do assume that if two offsets 
are `equal` then no
  * new data has arrived.
  */
-trait Offset extends Serializable {}
+abstract class Offset {
+
+  /**
+   * A JSON-serialized representation of an Offset that is
+   * used for saving offsets to the offset log.
+   *
+   * @return JSON string encoding
+   */
+  def json: String
+}
+
+/** Used when loading */
--- End diff --

Added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85615514
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -44,9 +45,14 @@ import org.apache.spark.util.UninterruptibleThread
  * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they 
don't guarantee listing
  * files in a directory always shows the latest files.
  */
-class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: 
String)
+class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, 
path: String)
   extends MetadataLog[T] with Logging {
 
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  /** Needed to serialize type T into JSON */
+  private implicit val manifest = 
Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85615490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala
 ---
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.streaming
+
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.sql.SparkSession
+
+class OffsetSeqLog(offsetLogVersion: String, sparkSession: SparkSession, 
path: String)
+  extends HDFSMetadataLog[OffsetSeq](sparkSession, path) {
+
+  override protected def deserialize(in: InputStream): OffsetSeq = {
+// called inside a try-finally where the underlying stream is closed 
in the caller
+
--- End diff --

right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85615458
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -713,6 +713,8 @@ class StreamExecution(
 }
 
 object StreamExecution {
+  val OFFSET_LOG_VERSION = "v1"
--- End diff --

I removed this from StreamExecution. I was initially copying the approach 
in CompactibleStreamLog, but realize that the notion of a version need not be 
parameterized here. I'm still retaining the notion of a version, which no 
resides in the OffsetSeqLog companion object, as per your suggestion. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85613884
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  testWithUninterruptibleThread("OffsetSeqLogSuite: basic") {
+withTempDir { temp =>
+  val dir = new File(temp, "dir") // use non-existent directory to 
test whether log make the dir
+  val metadataLog = new OffsetSeqLog("v1", spark, dir.getAbsolutePath)
+  val batch0 = OffsetSeq.fill(LongOffset(0))
+  val batch1 = OffsetSeq.fill(LongOffset(1), LongOffset(2))
+
+  val batch0Serialized = OffsetSeq.fill(batch0.offsets.map(_.map(o =>
+SerializedOffset(o.json))).flatten: _*)
+
+  val batch1Serialized = OffsetSeq.fill(batch1.offsets.map(_.map(o =>
+SerializedOffset(o.json))).flatten: _*)
+
+  assert(metadataLog.add(0, batch0))
+  assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
+  assert(metadataLog.get(0) === Some(batch0Serialized))
+
+  assert(metadataLog.add(1, batch1))
+  assert(metadataLog.get(0) === Some(batch0Serialized))
+  assert(metadataLog.get(1) === Some(batch1Serialized))
+  assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+  assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
+
+  // Adding the same batch does nothing
+  metadataLog.add(1, OffsetSeq.fill(LongOffset(3)))
+  assert(metadataLog.get(0) === Some(batch0Serialized))
+  assert(metadataLog.get(1) === Some(batch1Serialized))
+  assert(metadataLog.getLatest() === Some(1 -> batch1Serialized))
+  assert(metadataLog.get(None, Some(1)) ===
+ Array(0 -> batch0Serialized, 1 -> batch1Serialized))
--- End diff --

Got it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85613714
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  testWithUninterruptibleThread("OffsetSeqLogSuite: basic") {
--- End diff --

Got it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85613616
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.File
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext {
+
+  private implicit val formats = Serialization.formats(NoTypeHints)
+
+  testWithUninterruptibleThread("OffsetSeqLogSuite: basic") {
+withTempDir { temp =>
+  val dir = new File(temp, "dir") // use non-existent directory to 
test whether log make the dir
+  val metadataLog = new OffsetSeqLog("v1", spark, dir.getAbsolutePath)
+  val batch0 = OffsetSeq.fill(LongOffset(0))
+  val batch1 = OffsetSeq.fill(LongOffset(1), LongOffset(2))
+
+  val batch0Serialized = OffsetSeq.fill(batch0.offsets.map(_.map(o =>
+SerializedOffset(o.json))).flatten: _*)
+
+  val batch1Serialized = OffsetSeq.fill(batch1.offsets.map(_.map(o =>
--- End diff --

indeed!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85609921
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
 ---
@@ -37,14 +39,19 @@ import org.apache.spark.sql.SparkSession
  * compact log files every 10 batches by default into a big file. When
  * doing a compaction, it will read all old log files and merge them with 
the new batch.
  */
-abstract class CompactibleFileStreamLog[T: ClassTag](
+abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
--- End diff --

It's required by json4s that T be an AnyRef. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-28 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85609758
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala
 ---
@@ -36,4 +37,9 @@ class KafkaSourceOffsetSuite extends OffsetSuite {
   compare(
 one = KafkaSourceOffset(("t", 0, 1L)),
 two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L)))
+
+  compare(
--- End diff --

I have extended the KafkaSourceOffsetSuite to include more thorough 
evaluations of json serialization/deserialization, including to/from 
OffsetSeqLog.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-26 Thread tcondie
Github user tcondie commented on the issue:

https://github.com/apache/spark/pull/15626
  
@koeninger Thanks for pointing this out. I have revised to serialization 
procedure to use org.apache.spark.sql.kafka010.JsonUtils


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-25 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85026112
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala 
---
@@ -23,4 +23,16 @@ package org.apache.spark.sql.execution.streaming
  * ordering of two [[Offset]] instances.  We do assume that if two offsets 
are `equal` then no
  * new data has arrived.
  */
-trait Offset extends Serializable {}
+abstract class Offset {
+  def json: String
+}
+
+/** Used when loading */
+class SerializedOffset(override val json: String) extends Offset
--- End diff --

Makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-25 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85026079
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala 
---
@@ -23,4 +23,16 @@ package org.apache.spark.sql.execution.streaming
  * ordering of two [[Offset]] instances.  We do assume that if two offsets 
are `equal` then no
  * new data has arrived.
  */
-trait Offset extends Serializable {}
+abstract class Offset {
+  def json: String
+}
+
+/** Used when loading */
--- End diff --

Got it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-25 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85025740
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
 ---
@@ -22,8 +22,18 @@ package org.apache.spark.sql.execution.streaming
  */
 case class LongOffset(offset: Long) extends Offset {
 
+  override val json = offset.toString
+
   def +(increment: Long): LongOffset = new LongOffset(offset + increment)
   def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
 
   override def toString: String = s"#$offset"
 }
+
+object LongOffset {
+
+  def apply(serialized: Offset) : LongOffset = new 
LongOffset(serialized.json.toLong)
+
+}
--- End diff --

Can you please clarify best practices w.r.t. extra lines? I didn't see 
anything in the code style guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-25 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85025208
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 ---
@@ -51,4 +60,8 @@ private[kafka010] object KafkaSourceOffset {
   def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = {
 KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new 
TopicPartition(t, p), o) }.toMap)
   }
+
+  def apply(serialized: Offset): KafkaSourceOffset = {
--- End diff --

Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-25 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85025029
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 ---
@@ -27,13 +28,21 @@ import org.apache.spark.sql.execution.streaming.Offset
  */
 private[kafka010]
 case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, 
Long]) extends Offset {
+  import org.json4s.jackson.Serialization.{write}
--- End diff --

Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-25 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/15626#discussion_r85024877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -22,7 +22,10 @@ package org.apache.spark.sql.execution.streaming
  * [[Source]]s that are present in a streaming query. This is similar to 
simplified, single-instance
  * vector clock that must progress linearly forward.
  */
-case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
+case class OffsetSeq(offsets: Seq[Option[Offset]]) {
--- End diff --

I renamed to avoid confusing the class as being a type of Offset. Does that 
make sense or should I take an alternative approach?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log

2016-10-25 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/15626

SPARK-17829 [SQL] Stable format for offset log

## What changes were proposed in this pull request?

Currently we use java serialization for the WAL that stores the offsets 
contained in each batch. This has two main issues:
It can break across spark releases (though this is not the only thing 
preventing us from upgrading a running query)
It is unnecessarily opaque to the user.
I'd propose we require offsets to provide a user readable serialization and 
use that instead. JSON is probably a good option.

## How was this patch tested?

Tests were added for KafkaOffset in KafkaOffsetSuite and for LongOffset in 
OffsetSuite.

Please review 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before 
opening a pull request.

@zsxwing @marmbrus 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark spark-8360

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15626.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15626


commit 60a71fae0decaebaa4f869b78283295b6491992a
Author: Tyson Condie 
Date:   2016-10-21T23:53:34Z

initial version of offse json serialization

commit 52431141ce4c7efbf82da5a204ba77d16c03f16d
Author: Tyson Condie 
Date:   2016-10-22T00:26:51Z

remove CompositeOffsetSuite

commit 3ccdc5c00b7043570a3567560f1f9ffaeb1ba688
Author: Tyson Condie 
Date:   2016-10-24T22:54:53Z

update

commit 16c6cea8f91e16dc39c2a0796295f233d33054c4
Author: Tyson Condie 
Date:   2016-10-24T23:00:38Z

update test parameters to avoid test name conflict




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org