[GitHub] spark pull request #17444: [SPARK-19876][SS] Follow up: Refactored BatchComm...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/17444#discussion_r108311230 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala --- @@ -45,33 +45,39 @@ import org.apache.spark.sql.SparkSession class BatchCommitLog(sparkSession: SparkSession, path: String) extends HDFSMetadataLog[String](sparkSession, path) { + import BatchCommitLog._ + + def add(batchId: Long): Unit = { +super.add(batchId, EMPTY_JSON) + } + + override def add(batchId: Long, metadata: String): Boolean = { +throw new UnsupportedOperationException( --- End diff -- What if we want metadata down the road? What's the problem with supporting this for cases other than null? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17219: [SPARK-19876][SS][WIP] OneTime Trigger Executor
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/17219#discussion_r107045633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala --- @@ -38,6 +38,51 @@ sealed trait Trigger /** * :: Experimental :: + * A trigger that runs a query once then terminates + * + * Scala Example: + * {{{ + * df.write.trigger(OneTime) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(OneTime.create()) + * }}} + * + * @since 2.2.0 + */ +@Experimental +@InterfaceStability.Evolving +case class OneTime() extends Trigger + +/** + * :: Experimental :: + * Used to create [[OneTime]] triggers for [[StreamingQuery]]s. --- End diff -- The explanation of OneTime trigger is given in the OneTime class definition. Does that suffice, or should the explanation be reiterated in the object definition? If it should be reiterated, then I can do the same with ProcessingTime. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/17246 [SPARK-19906][SS][DOCS] Documentation describing how to write queries to Kafka ## What changes were proposed in this pull request? Add documentation that describes how to write streaming and batch queries to Kafka. @zsxwing @tdas Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark kafka-write-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17246.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17246 commit 172d4505e5583c541e4644b1eeb12f853bf638cd Author: Tyson Condie Date: 2017-03-10T19:14:33Z update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17231: [SPARK-19891][SS] Await Batch Lock notified on st...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/17231 [SPARK-19891][SS] Await Batch Lock notified on stream execution exit ## What changes were proposed in this pull request? We need to notify the await batch lock when the stream exits early e.g., when an exception has been thrown. ## How was this patch tested? Current tests that throw exceptions at runtime will finish faster as a result of this update. @zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark kafka-writer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17231.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17231 commit d371758538f659cbcf604e591110665cfca4f216 Author: Tyson Condie Date: 2017-01-20T01:53:03Z add kafka relation and refactor kafka source commit b6c3055f2c21050cb9e203213651e02779d724bc Author: Tyson Condie Date: 2017-01-20T02:29:27Z update commit 4c81812e93907219af69dd57e0f4f5ebaf92b262 Author: Tyson Condie Date: 2017-01-20T18:24:07Z update commit ab02a4c631f9fc0ecd8528d85c61fe3c5de64040 Author: Tyson Condie Date: 2017-01-20T19:53:24Z single kafka provider for both stream and batch commit e6b57edb0958649062749cb9a0f7cda74f0b2829 Author: Tyson Condie Date: 2017-01-24T00:32:56Z added uninterruptible thread version of kafka offset reader commit ff94ed803474448f6bb388f8933e6ec091fc24a1 Author: Tyson Condie Date: 2017-01-24T00:44:31Z added uninterruptible thread version of kafka offset reader commit f8fd34cf0c2da4f1b9c793e4e021c23a923a0285 Author: Tyson Condie Date: 2017-01-24T01:13:16Z update tests commit 41271e25896387262c2c3a5b4fad6ba48feb9121 Author: Tyson Condie Date: 2017-01-24T18:32:26Z resolve conflicts in KafakSource commit 74d96fc9049a0a0fb6de6d011eb896b7d7c32b30 Author: Tyson Condie Date: 2017-01-24T18:45:12Z update comments commit d31fc8104bf94c82bc6fa1c099def9ca16fec93a Author: Tyson Condie Date: 2017-01-25T00:53:11Z address comments from @zsxwing commit 1db1649201361bcede52997ec8c2f0610a55da8b Author: Tyson Condie Date: 2017-01-25T01:04:06Z update commit 3b0d48b53c1b7a992557c6a42910a4681a84865e Author: Tyson Condie Date: 2017-01-25T18:05:55Z Merge branch 'master' of https://github.com/apache/spark into SPARK-18682 commit a5b02691ddbaef6c8092881ea15b582d9137be71 Author: Tyson Condie Date: 2017-01-26T21:32:00Z address comments from @zsxwing commit c08c01fd21fc53d3db3504a257ab6bb6115cd462 Author: Tyson Condie Date: 2017-01-27T16:33:29Z late binding offsets commit 79d335e697ba5af3f02f524d39c16e61d2cc73d9 Author: Tyson Condie Date: 2017-01-27T18:44:50Z update to late binding logic commit a44b365bba4bcdb7cd56d16c31d0447de9ed5aa3 Author: Tyson Condie Date: 2017-01-27T19:00:50Z Merge branch 'SPARK-18682' into kafka-writer commit 51291e36fad5f3511ce1d3afc17e2e714dffe2d3 Author: Tyson Condie Date: 2017-01-27T20:21:36Z remove kafka log4j debug commit b597cf1bf2135659de1e25bb7d41aeeac26f87f9 Author: Tyson Condie Date: 2017-01-27T20:22:33Z remove kafka log4j debug commit 84b32c55787290415016186d0c53543661b12851 Author: Tyson Condie Date: 2017-01-30T21:12:48Z Merge branch 'SPARK-18682' into kafka-writer commit f5ae3012ec37be0c4a6208ba00438592cd3aa791 Author: Tyson Condie Date: 2017-01-31T20:17:18Z update commit 2487a7260ec11496aed0135a723e156376c6ff31 Author: Tyson Condie Date: 2017-01-31T22:29:24Z address comments from @zsxwing commit 789d3afc0163b93aa0f859354221ea7e2374f74a Author: Tyson Condie Date: 2017-01-31T23:12:43Z update commit 56a06e7dc75a72148fca498b41d7adf7c41e4cfe Author: Tyson Condie Date: 2017-02-01T22:02:18Z Merge branch 'SPARK-18682' into kafka-writer commit e74473b28b160633ab6545fae208981096fe367c Author: Tyson Condie Date: 2017-02-02T00:58:01Z update commit 73df054e87eb755bc0b25ee15e7fdd501cd0c10c Author: Tyson Condie Date: 2017-02-02T22:43:39Z update commit 5b48fc65ac08e8ed4a09edd0d346990d40d042e0 Author: Tyson Condie Date: 2017-02-03T01:20:03Z address comments from @tdas commit 57760094591439525d3a53d0c5845a0da7e9b8eb Author: Tyson Condie Date: 2017-02-03T19:46:00Z address feedback from @tdas and @sxwing commit 63d453f6078897d4049640d009aa0722b1f93908 Author: Tyson Condie Date: 2017-02-03T20:00:13Z update merge commit 3c4eecf3cd529c45c7b7a1bf053c7ea92e4caa50 Author: Tyson Condie Date: 2017-02-03T23:16:03Z update commit b0611e48f71be67e1481cb0754e7f49f06f73dc2 Author: Tyson Condie Date: 2017-02
[GitHub] spark pull request #17219: [SPARK-19876][SS][WIP] OneTime Trigger Executor
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/17219 [SPARK-19876][SS][WIP] OneTime Trigger Executor ## What changes were proposed in this pull request? An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers. In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature. ## How was this patch tested? A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly. In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests: - The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop). - The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log. - A OneTime trigger execution that results in an exception being thrown. @marmbrus @tdas @zsxwing Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark stream-commit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17219.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17219 commit 08a36f8c5a833da1deb5db99dc620ba4e98d67a1 Author: Tyson Condie Date: 2017-03-06T19:05:58Z update commit 0e21d0e829a134d050b1c881745dbcfca8986378 Author: Tyson Condie Date: 2017-03-07T00:46:30Z Merge branch 'master' of https://github.com/apache/spark into stream-commit commit 9b8abb445725828e73e392ee58ffa844c2506ca3 Author: Tyson Condie Date: 2017-03-08T18:55:57Z update existing tests commit 682eb1a3987c0f481de4bebf72926f14816d7607 Author: Tyson Condie Date: 2017-03-09T00:10:06Z add onetime trigger test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17043: [SPARK-19719][SS] Kafka writer for both structure...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/17043 [SPARK-19719][SS] Kafka writer for both structured streaming and batch queires ## What changes were proposed in this pull request? Add a new Kafka Sink and Kafka Relation for writing streaming and batch queries, respectively, to Apache Kafka. @tdas @zsxwing ## How was this patch tested? KafkaSinkSuite.scala contains tests for writing both streaming and batch queries to Kafka. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark kafka-writer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17043.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17043 commit d371758538f659cbcf604e591110665cfca4f216 Author: Tyson Condie Date: 2017-01-20T01:53:03Z add kafka relation and refactor kafka source commit b6c3055f2c21050cb9e203213651e02779d724bc Author: Tyson Condie Date: 2017-01-20T02:29:27Z update commit 4c81812e93907219af69dd57e0f4f5ebaf92b262 Author: Tyson Condie Date: 2017-01-20T18:24:07Z update commit ab02a4c631f9fc0ecd8528d85c61fe3c5de64040 Author: Tyson Condie Date: 2017-01-20T19:53:24Z single kafka provider for both stream and batch commit e6b57edb0958649062749cb9a0f7cda74f0b2829 Author: Tyson Condie Date: 2017-01-24T00:32:56Z added uninterruptible thread version of kafka offset reader commit ff94ed803474448f6bb388f8933e6ec091fc24a1 Author: Tyson Condie Date: 2017-01-24T00:44:31Z added uninterruptible thread version of kafka offset reader commit f8fd34cf0c2da4f1b9c793e4e021c23a923a0285 Author: Tyson Condie Date: 2017-01-24T01:13:16Z update tests commit 41271e25896387262c2c3a5b4fad6ba48feb9121 Author: Tyson Condie Date: 2017-01-24T18:32:26Z resolve conflicts in KafakSource commit 74d96fc9049a0a0fb6de6d011eb896b7d7c32b30 Author: Tyson Condie Date: 2017-01-24T18:45:12Z update comments commit d31fc8104bf94c82bc6fa1c099def9ca16fec93a Author: Tyson Condie Date: 2017-01-25T00:53:11Z address comments from @zsxwing commit 1db1649201361bcede52997ec8c2f0610a55da8b Author: Tyson Condie Date: 2017-01-25T01:04:06Z update commit 3b0d48b53c1b7a992557c6a42910a4681a84865e Author: Tyson Condie Date: 2017-01-25T18:05:55Z Merge branch 'master' of https://github.com/apache/spark into SPARK-18682 commit a5b02691ddbaef6c8092881ea15b582d9137be71 Author: Tyson Condie Date: 2017-01-26T21:32:00Z address comments from @zsxwing commit c08c01fd21fc53d3db3504a257ab6bb6115cd462 Author: Tyson Condie Date: 2017-01-27T16:33:29Z late binding offsets commit 79d335e697ba5af3f02f524d39c16e61d2cc73d9 Author: Tyson Condie Date: 2017-01-27T18:44:50Z update to late binding logic commit a44b365bba4bcdb7cd56d16c31d0447de9ed5aa3 Author: Tyson Condie Date: 2017-01-27T19:00:50Z Merge branch 'SPARK-18682' into kafka-writer commit 51291e36fad5f3511ce1d3afc17e2e714dffe2d3 Author: Tyson Condie Date: 2017-01-27T20:21:36Z remove kafka log4j debug commit b597cf1bf2135659de1e25bb7d41aeeac26f87f9 Author: Tyson Condie Date: 2017-01-27T20:22:33Z remove kafka log4j debug commit 84b32c55787290415016186d0c53543661b12851 Author: Tyson Condie Date: 2017-01-30T21:12:48Z Merge branch 'SPARK-18682' into kafka-writer commit f5ae3012ec37be0c4a6208ba00438592cd3aa791 Author: Tyson Condie Date: 2017-01-31T20:17:18Z update commit 2487a7260ec11496aed0135a723e156376c6ff31 Author: Tyson Condie Date: 2017-01-31T22:29:24Z address comments from @zsxwing commit 789d3afc0163b93aa0f859354221ea7e2374f74a Author: Tyson Condie Date: 2017-01-31T23:12:43Z update commit 56a06e7dc75a72148fca498b41d7adf7c41e4cfe Author: Tyson Condie Date: 2017-02-01T22:02:18Z Merge branch 'SPARK-18682' into kafka-writer commit e74473b28b160633ab6545fae208981096fe367c Author: Tyson Condie Date: 2017-02-02T00:58:01Z update commit 73df054e87eb755bc0b25ee15e7fdd501cd0c10c Author: Tyson Condie Date: 2017-02-02T22:43:39Z update commit 5b48fc65ac08e8ed4a09edd0d346990d40d042e0 Author: Tyson Condie Date: 2017-02-03T01:20:03Z address comments from @tdas commit 57760094591439525d3a53d0c5845a0da7e9b8eb Author: Tyson Condie Date: 2017-02-03T19:46:00Z address feedback from @tdas and @sxwing commit 63d453f6078897d4049640d009aa0722b1f93908 Author: Tyson Condie Date: 2017-02-03T20:00:13Z update merge commit 3c4eecf3cd529c45c7b7a1bf053c7ea92e4caa50 Author: Tyson Condie Date: 2017-02-03T23:16:03Z update commit b0611e48f71be67e1481cb0754e7f49f06f73dc2 Author: Ty
[GitHub] spark pull request #16918: [SPARK-19584] [SS] [DOCS] update structured strea...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16918#discussion_r101168119 --- Diff: docs/structured-streaming-kafka-integration.md --- @@ -187,50 +306,68 @@ The following options must be set for the Kafka source. The following configurations are optional: -Optionvaluedefaultmeaning +Optionvaluedefaultquery typemeaning startingOffsets - earliest, latest, or json string - {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} + "earliest", "latest" (streaming only), or json string + """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ - latest + "latest" for streaming, "earliest" for batch + streaming and batch The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. - Note: This only applies when a new Streaming query is started, and that resuming will always pick - up from where the query left off. Newly discovered partitions during a query will start at + Note: For Batch queries, latest (either implicitly or by using -1 in json) is not allowed. --- End diff -- Agreed. The previous version had "S" caps, so I wasn't sure if this was a convention or not. I'll make the correction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16918: [SPARK-19584] [SS] [DOCS] update structured strea...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/16918 [SPARK-19584] [SS] [DOCS] update structured streaming documentation around batch mode ## What changes were proposed in this pull request? Revision to structured-streaming-kafka-integration.md to reflect new Batch query specification and options. @zsxwing @tdas ## How was this patch tested? N/A Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark kafka-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16918.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16918 commit debe111da3cf1981a4c666554eba669be248f9d4 Author: Tyson Condie Date: 2017-02-14T00:01:06Z update structured streaming documentation around batch mode --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r99382639 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaTopicPartitionOffsetReader.scala --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.{Executors, ThreadFactory} + +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types._ +import org.apache.spark.util.{ThreadUtils, UninterruptibleThread} + +/** + * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets from Kafka. + * The [[ConsumerStrategy]] class defines which Kafka topics and partitions should be read + * by this source. These strategies directly correspond to the different consumption options + * in. This class is designed to return a configured [[KafkaConsumer]] that is used by the + * [[KafkaSource]] to query for the offsets. See the docs on + * [[org.apache.spark.sql.kafka010.ConsumerStrategy]] + * for more details. + * + * Note: This class is not ThreadSafe + */ +private[kafka010] class KafkaTopicPartitionOffsetReader( --- End diff -- I'll rollback. My thought was to also indicate that it's being used to read TopicPartition(s). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r99381274 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala --- @@ -19,14 +19,31 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -/* - * Values that can be specified for config startingOffsets +/** + * Values that can be specified to configure starting, + * ending, and specific offsets. */ -private[kafka010] sealed trait StartingOffsets +private[kafka010] sealed trait KafkaOffsets --- End diff -- I went with KafkaOffsetRangeLimit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r99244144 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala --- @@ -135,7 +136,28 @@ private[kafka010] class KafkaSourceRDD( } else { new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() { val consumer = CachedKafkaConsumer.getOrCreate( - range.topic, range.partition, executorKafkaParams) +range.topic, range.partition, executorKafkaParams, reuseKafkaConsumer) +if (range.fromOffset < 0 || range.untilOffset < 0) { --- End diff -- Reworked it. Let me know what you think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r99240245 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -334,14 +334,15 @@ private[kafka010] object CachedKafkaConsumer extends Logging { def getOrCreate( topic: String, partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = synchronized { + kafkaParams: ju.Map[String, Object], + reuse: Boolean): CachedKafkaConsumer = synchronized { --- End diff -- reuse existing. I changed the name to reuseExistingIfPresent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on the issue: https://github.com/apache/spark/pull/16686 jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r98237559 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.kafka.common.TopicPartition +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +class KafkaRelationSuite extends QueryTest with BeforeAndAfter with SharedSQLContext { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private var testUtils: KafkaTestUtils = _ + + private def newTopic(): String = s"topic-${topicId.getAndIncrement()}" + + private def assignString(topic: String, partitions: Iterable[Int]): String = { +JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p))) + } + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null + super.afterAll() +} + } + + test("explicit earliest to latest offsets") { +val topic = newTopic() +testUtils.createTopic(topic, partitions = 3) +testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, Some(0)) +testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, Some(1)) +testUtils.sendMessages(topic, Array("20"), Some(2)) + +// Specify explicit earliest and latest offset values +val reader = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + .option("endingOffsets", "latest") + .load() +var df = reader.selectExpr("CAST(value AS STRING)") +checkAnswer(df, (0 to 20).map(_.toString).toDF) + +// "latest" should late bind to the current (latest) offset in the reader +testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, Some(2)) --- End diff -- This no longer holds now that we're binding in the executor, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r97620911 --- Diff: external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister --- @@ -1 +1 @@ -org.apache.spark.sql.kafka010.KafkaSourceProvider +org.apache.spark.sql.kafka010.KafkaProvider --- End diff -- That's true, but revised Provider not only provides a Source but also a Relation, hence the decision to rename to something more general. Not clear if this outweighs the risks you've pointed out. @tdas @zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16686: [SPARK-18682][SS] Batch Source for Kafka
Github user tcondie commented on the issue: https://github.com/apache/spark/pull/16686 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/16686 [SPARK-18682][SS] Batch Source for Kafka ## What changes were proposed in this pull request? Today, you can start a stream that reads from kafka. However, given kafka's configurable retention period, it seems like sometimes you might just want to read all of the data that is available now. As such we should add a version that works with spark.read as well. The options should be the same as the streaming kafka source, with the following differences: startingOffsets should default to earliest, and should not allow latest (which would always be empty). endingOffsets should also be allowed and should default to latest. the same assign json format as startingOffsets should also be accepted. It would be really good, if things like .limit(n) were enough to prevent all the data from being read (this might just work). ## How was this patch tested? KafkaRelationSuite was added for testing batch queries via KafkaUtils. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark SPARK-18682 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16686.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16686 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16219: [SPARK-18790][SS] Keep a general offset history o...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/16219#discussion_r91805757 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -303,7 +303,6 @@ private[state] class HDFSBackedStateStoreProvider( val mapFromFile = readSnapshotFile(version).getOrElse { val prevMap = loadMap(version - 1) val newMap = new MapType(prevMap) -newMap.putAll(prevMap) --- End diff -- new MapType(prevMap) will make a call that is equivalent to newMap.putAll(prevMap). So basically, newMap.putAll(prevMap) is redundant work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16219: [SPARK-18790][SS] Keep a general offset history o...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/16219 [SPARK-18790][SS] Keep a general offset history of stream batches ## What changes were proposed in this pull request? Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches: the offsets that are present in each batch versions of the state store the files lists stored for the FileStreamSource the metadata log stored by the FileStreamSink @marmbrus @zsxwing ## How was this patch tested? The following tests were added. ### StreamExecution offset metadata Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain ### CompactibleFileStreamLog Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark offset_hist Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16219.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16219 commit 2d96af3ad4fe3d07cd80c727d2527de1d0ba3c57 Author: Tyson Condie Date: 2016-12-08T19:02:33Z revised log history maintenence based on minBatchesToRetain configuration parameter commit fc1557eb178d070814776ffaa6c14a8cb48ea83a Author: Tyson Condie Date: 2016-12-08T20:42:57Z add test for metadata garbage collection based on minBatchesToRetain --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89873586 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecutionMetadata} +import org.apache.spark.sql.functions._ +import org.apache.spark.util.{SystemClock, Utils} + +class StreamExecutionMetadataSuite extends StreamTest { + + private def newMetadataDir = +Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + test("stream execution metadata") { +assert(StreamExecutionMetadata(0, 0) === + StreamExecutionMetadata("""{}""")) +assert(StreamExecutionMetadata(1, 0) === + StreamExecutionMetadata("""{"batchWatermarkMs":1}""")) +assert(StreamExecutionMetadata(0, 2) === + StreamExecutionMetadata("""{"batchTimestampMs":2}""")) +assert(StreamExecutionMetadata(1, 2) === + StreamExecutionMetadata( +"""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + } + + test("metadata is recovered from log when query is restarted") { +import testImplicits._ +val clock = new SystemClock() +val ms = new MemoryStream[Long](0, sqlContext) +val df = ms.toDF().toDF("a") +val checkpointLoc = newMetadataDir +val checkpointDir = new File(checkpointLoc, "complete") +checkpointDir.mkdirs() +assert(checkpointDir.exists()) +val tableName = "test" +// Query that prunes timestamps less than current_timestamp, making +// it easy to use for ensuring that a batch is re-processed with the +// timestamp used when it was first processed. +def startQuery: StreamingQuery = { + df.groupBy("a") +.count() +.where('a >= current_timestamp().cast("long")) +.writeStream +.format("memory") +.queryName(tableName) +.option("checkpointLocation", checkpointLoc) +.outputMode("complete") +.start() +} +// no exception here +val t1 = clock.getTimeMillis() + 60L * 1000L +val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L +val q = startQuery +ms.addData(t1, t2) +q.processAllAvailable() + +checkAnswer( + spark.table(tableName), + Seq(Row(t1, 1), Row(t2, 1)) +) + +q.stop() +Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2 --- End diff -- I should also say that I'm not too concerned by the indeterministic system clock issues since the batchTimestamp is recorded prior to running the query. Therefore as long as the query gets planned within 10 seconds, we're good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SPARK-18513] [SQL] Don't push down...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89856411 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.io.File + +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecutionMetadata} +import org.apache.spark.sql.functions._ +import org.apache.spark.util.{SystemClock, Utils} + +class StreamExecutionMetadataSuite extends StreamTest { + + private def newMetadataDir = +Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + + test("stream execution metadata") { +assert(StreamExecutionMetadata(0, 0) === + StreamExecutionMetadata("""{}""")) +assert(StreamExecutionMetadata(1, 0) === + StreamExecutionMetadata("""{"batchWatermarkMs":1}""")) +assert(StreamExecutionMetadata(0, 2) === + StreamExecutionMetadata("""{"batchTimestampMs":2}""")) +assert(StreamExecutionMetadata(1, 2) === + StreamExecutionMetadata( +"""{"batchWatermarkMs":1,"batchTimestampMs":2}""")) + } + + test("metadata is recovered from log when query is restarted") { +import testImplicits._ +val clock = new SystemClock() +val ms = new MemoryStream[Long](0, sqlContext) +val df = ms.toDF().toDF("a") +val checkpointLoc = newMetadataDir +val checkpointDir = new File(checkpointLoc, "complete") +checkpointDir.mkdirs() +assert(checkpointDir.exists()) +val tableName = "test" +// Query that prunes timestamps less than current_timestamp, making +// it easy to use for ensuring that a batch is re-processed with the +// timestamp used when it was first processed. +def startQuery: StreamingQuery = { + df.groupBy("a") +.count() +.where('a >= current_timestamp().cast("long")) +.writeStream +.format("memory") +.queryName(tableName) +.option("checkpointLocation", checkpointLoc) +.outputMode("complete") +.start() +} +// no exception here +val t1 = clock.getTimeMillis() + 60L * 1000L +val t2 = clock.getTimeMillis() + 60L * 1000L + 1000L +val q = startQuery +ms.addData(t1, t2) +q.processAllAvailable() + +checkAnswer( + spark.table(tableName), + Seq(Row(t1, 1), Row(t2, 1)) +) + +q.stop() +Thread.sleep(60L * 1000L + 5000L) // Expire t1 and t2 --- End diff -- @zsxwing I don't see an obvious way to pass a StreamManualClock in DataStreamWriter.start(). Should I be taking an entirely different approach? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89208121 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -72,6 +72,26 @@ case class CurrentTimestamp() extends LeafExpression with CodegenFallback { } /** + * Expression representing the current batch time, which is used by StreamExecution to + * 1. prevent optimizer from pushing this expression below a stateful operator + * 2. allow IncrementalExecution to substitute this expression with a Literal(timestamp) + * + * There is no code generation since this expression should be replaced with a literal. + */ +case class CurrentBatchTimestamp(timestamp: SQLTimestamp) extends LeafExpression --- End diff -- Both Nondeterministic and Unevaluable have final 'eval' methods, so both cannot be mixed in, and I need Nondeterministic to avoid optimizer pushdown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89022903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -38,6 +40,26 @@ import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** + * Contains metadata associated with a stream execution. This information is + * persisted to the offset log via the OffsetSeq metadata field. Current + * information contained in this object includes: + * + * 1. currentEventTimeWatermark: The current eventTime watermark, used to + * bound the lateness of data that will processed. + * 2. currentBatchTimestamp: The current batch processing timestamp + */ +case class StreamExecutionMetadata( +var currentEventTimeWatermark: Long = 0, +var currentBatchTimestamp: Long = 0) { --- End diff -- Ryan indicated we use SQLTimestamp to indicate microseconds. I changed the name of the watermark to currentEventTimeWatermarkMillis based on your suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r89017673 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -288,7 +310,11 @@ class StreamExecution( logInfo(s"Resuming streaming query, starting with batch $batchId") currentBatchId = batchId availableOffsets = nextOffsets.toStreamProgress(sources) -logDebug(s"Found possibly uncommitted offsets $availableOffsets") +streamExecutionMetadata = StreamExecutionMetadata(nextOffsets.metadata.getOrElse( + throw new IllegalStateException("OffsetLog does not contain current batch timestamp!") --- End diff -- Adding {} doesn't yield code that compiles/typechecks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [SPARK-18339] [SQL] Don't push down current_times...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15949#discussion_r88905225 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -422,6 +432,7 @@ class StreamExecution( val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case t: CurrentTimestamp => CurrentBatchTimestamp(currentBatchTimestamp) --- End diff -- Good question. The purpose of the dedicated expression is to avoid having it pushed down below a stateful operator (e.g., aggregation). during optimization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15949: [Spark-18339] [SQL] Don't push down current_times...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/15949 [Spark-18339] [SQL] Don't push down current_timestamp for filters in StructuredStreaming ## What changes were proposed in this pull request? For the following workflow: 1. I have a column called time which is at minute level precision in a Streaming DataFrame 2. I want to perform groupBy time, count 3. Then I want my MemorySink to only have the last 30 minutes of counts and I perform this by .where('time >= current_timestamp().cast("long") - 30 * 60) what happens is that the `filter` gets pushed down before the aggregation, and the filter happens on the source data for the aggregation instead of the result of the aggregation (where I actually want to filter). I guess the main issue here is that `current_timestamp` is non-deterministic in the streaming context and shouldn't be pushed down the filter. Does this require us to store the `current_timestamp` for each trigger of the streaming job, that is something to discuss. @brkyvz @zsxwing @tdas ## How was this patch tested? A test was added to StreamingAggregationSuite ensuring the above use case is handled. The test injects a stream of time values (in seconds) to a query that runs in complete mode and only outputs the (count) aggregation results for the past 10 seconds. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark SPARK-18339 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15949.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15949 commit 729ecedab4b61804a9717cadbd4f2c7b6aa50176 Author: Tyson Condie Date: 2016-11-18T23:26:25Z added CurrentBatchTimestamp commit b8a1f71bef2aa4c11c08178b2250bd995e952601 Author: Tyson Condie Date: 2016-11-18T23:35:27Z update comment commit 8f0a27329ea711d1936c2df11a310129e22eb9b5 Author: Tyson Condie Date: 2016-11-20T20:41:49Z add test for filtering time-based aggregation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15924: Spark-18498 [SQL] Revise HDFSMetadataLog API for ...
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/15924 Spark-18498 [SQL] Revise HDFSMetadataLog API for better testing ## What changes were proposed in this pull request? Revise HDFSMetadataLog API such that metadata object serialization and final batch file write are separated. This will allow serialization checks without worrying about batch file name formats. ## How was this patch tested? Existing tests already ensure this API faithfully support core functionality i.e., creation of batch files. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark SPARK-18498 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15924.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15924 commit 8e3d7051673f30863a938e8b06299b8aec227886 Author: Tyson Condie Date: 2016-11-18T01:30:21Z revise HDFSMetadataLog API for better testing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15852: Spark-18187 [SQL] CompactibleFileStreamLog should...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15852#discussion_r88362276 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala --- @@ -63,7 +63,60 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( protected def isDeletingExpiredLog: Boolean - protected def compactInterval: Int + protected def defaultCompactInterval: Int + + protected final lazy val compactInterval: Int = { --- End diff -- FileStreamSourceLog uses compactInterval in multiple places. Please advise? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15852: Spark-18187 [SQL] CompactibleFileStreamLog should...
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15852#discussion_r88359775 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala --- @@ -63,7 +63,60 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( protected def isDeletingExpiredLog: Boolean - protected def compactInterval: Int + protected def defaultCompactInterval: Int + + protected final lazy val compactInterval: Int = { +// SPARK-18187: "compactInterval" can be set by user via defaultCompactInterval. +// If there are existing log entries, then we should ensure a compatible compactInterval +// is used, irrespective of the defaultCompactInterval. There are three cases: +// +// 1. If there is no '.compact' file, we can use the default setting directly. +// 2. If there are two or more '.compact' files, we use the interval of patch id suffix with +// '.compact' as compactInterval. It is unclear whether this case will ever happen in the +// current code, since only the latest '.compact' file is retained i.e., other are garbage +// collected. +// 3. If there is only one '.compact' file, then we must find a compact interval +// that is compatible with (i.e., a divisor of) the previous compact file, and that +// faithfully tries to represent the revised default compact interval i.e., is at least +// is large if possible. +// e.g., if defaultCompactInterval is 5 (and previous compact interval could have +// been any 2,3,4,6,12), then a log could be: 11.compact, 12, 13, in which case +// will ensure that the new compactInterval = 6 > 5 and (11 + 1) % 6 == 0 +val compactibleBatchIds = fileManager.list(metadataPath, batchFilesFilter) + .filter(f => f.getPath.toString.endsWith(CompactibleFileStreamLog.COMPACT_FILE_SUFFIX)) + .map(f => pathToBatchId(f.getPath)) + .sorted + .reverse + +// Case 1 +var interval = defaultCompactInterval +if (compactibleBatchIds.length >= 2) { + // Case 2 + val latestCompactBatchId = compactibleBatchIds(0) + val previousCompactBatchId = compactibleBatchIds(1) + interval = (latestCompactBatchId - previousCompactBatchId).toInt + logInfo(s"Compact interval case 2 = $interval") --- End diff -- This was debugging info that I was going to remove. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15852: Spark 18187
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/15852 Spark 18187 ## What changes were proposed in this pull request? CompactibleFileStreamLog relys on "compactInterval" to detect a compaction batch. If the "compactInterval" is reset by user, CompactibleFileStreamLog will return wrong answer, resulting data loss. This PR procides a way to check the validity of 'compactInterval', and calculate an appropriate value. ## How was this patch tested? When restart a stream, we change the 'spark.sql.streaming.fileSource.log.compactInterval' different with the former one. The primary solution to this issue was given by @uncleGen Added extensions include an additional metadata field in OffsetSeq and CompactibleFileStreamLog APIs. @zsxwing You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark spark-18187 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15852.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15852 commit 65395dddb505f6084db471430da1486d75a77e2a Author: genmao.ygm Date: 2016-11-09T08:21:09Z SPARK-18187: CompactibleFileStreamLog should not rely on "compactInterval" to detect a compaction batch commit d556933e0f039d661989e07f381aff185c9fac1b Author: genmao.ygm Date: 2016-11-09T08:24:53Z comment update commit 8b56f70b2dffd69dbc37007e923f3d5a56fce039 Author: genmao.ygm Date: 2016-11-09T08:34:11Z revert commit 4a7e28c4e372caa3b16b979273577bd6aa2c11f3 Author: genmao.ygm Date: 2016-11-09T08:35:13Z unit test - compacat metadata log change compactInterval from 4 to 5 commit 23e1baf454bde511ed1963a27f6492100823d496 Author: genmao.ygm Date: 2016-11-09T09:34:15Z bug fix: /zero commit 7d37e08026eaa1364e8a4fb10fb7cfb93cb51229 Author: Tyson Condie Date: 2016-11-11T00:50:02Z Merge branch 'SPARK-18187' of https://github.com/uncleGen/spark into spark-18187 commit d3f7bbf32d0debba24853a38eb48bfcdcdb517be Author: Tyson Condie Date: 2016-11-11T00:52:24Z Merge branch 'master' of https://github.com/apache/spark into spark-18187 commit 6901eacdddf235db4ba91a0903ce8826978d778a Author: Tyson Condie Date: 2016-11-11T18:16:41Z extend offset seq to include metadata --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on the issue: https://github.com/apache/spark/pull/15626 All log are now uniformly marshall entries to JSON. Each log has a particular way of organizing its JSON entries: HDFSMetadataLog: Each Batch ID is a separate file. All log entries within a batch id are serialized in one line. OffsetSeqLog: Each Batch ID is a separate file. All log entries (i.e., Offsets) are separated by a newline character i.e., one offset entry per line. CompactibleFileStreamLog, FileStreamSinkLog, and FileStreamSourceLog No changes to log file format i.e., log entries are separated by a new line character. Log entry -> JSON serialization is now done in CompactibleFileStreamLog directly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85615504 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala --- @@ -23,4 +23,16 @@ package org.apache.spark.sql.execution.streaming * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no * new data has arrived. */ -trait Offset extends Serializable {} +abstract class Offset { + + /** + * A JSON-serialized representation of an Offset that is + * used for saving offsets to the offset log. + * + * @return JSON string encoding + */ + def json: String +} + +/** Used when loading */ --- End diff -- Added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85615514 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -44,9 +45,14 @@ import org.apache.spark.util.UninterruptibleThread * Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing * files in a directory always shows the latest files. */ -class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String) +class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: String) extends MetadataLog[T] with Logging { + private implicit val formats = Serialization.formats(NoTypeHints) + + /** Needed to serialize type T into JSON */ + private implicit val manifest = Manifest.classType[T](implicitly[ClassTag[T]].runtimeClass) --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85615490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala --- @@ -0,0 +1,62 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.streaming + + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets._ + +import scala.io.{Source => IOSource} + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.sql.SparkSession + +class OffsetSeqLog(offsetLogVersion: String, sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[OffsetSeq](sparkSession, path) { + + override protected def deserialize(in: InputStream): OffsetSeq = { +// called inside a try-finally where the underlying stream is closed in the caller + --- End diff -- right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85615458 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -713,6 +713,8 @@ class StreamExecution( } object StreamExecution { + val OFFSET_LOG_VERSION = "v1" --- End diff -- I removed this from StreamExecution. I was initially copying the approach in CompactibleStreamLog, but realize that the notion of a version need not be parameterized here. I'm still retaining the notion of a version, which no resides in the OffsetSeqLog companion object, as per your suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85613884 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSQLContext + +class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { + + private implicit val formats = Serialization.formats(NoTypeHints) + + testWithUninterruptibleThread("OffsetSeqLogSuite: basic") { +withTempDir { temp => + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir + val metadataLog = new OffsetSeqLog("v1", spark, dir.getAbsolutePath) + val batch0 = OffsetSeq.fill(LongOffset(0)) + val batch1 = OffsetSeq.fill(LongOffset(1), LongOffset(2)) + + val batch0Serialized = OffsetSeq.fill(batch0.offsets.map(_.map(o => +SerializedOffset(o.json))).flatten: _*) + + val batch1Serialized = OffsetSeq.fill(batch1.offsets.map(_.map(o => +SerializedOffset(o.json))).flatten: _*) + + assert(metadataLog.add(0, batch0)) + assert(metadataLog.getLatest() === Some(0 -> batch0Serialized)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + + assert(metadataLog.add(1, batch1)) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) + + // Adding the same batch does nothing + metadataLog.add(1, OffsetSeq.fill(LongOffset(3))) + assert(metadataLog.get(0) === Some(batch0Serialized)) + assert(metadataLog.get(1) === Some(batch1Serialized)) + assert(metadataLog.getLatest() === Some(1 -> batch1Serialized)) + assert(metadataLog.get(None, Some(1)) === + Array(0 -> batch0Serialized, 1 -> batch1Serialized)) --- End diff -- Got it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85613714 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSQLContext + +class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { + + private implicit val formats = Serialization.formats(NoTypeHints) + + testWithUninterruptibleThread("OffsetSeqLogSuite: basic") { --- End diff -- Got it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85613616 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLogSuite.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.File + +import org.json4s.NoTypeHints +import org.json4s.jackson.Serialization + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSQLContext + +class OffsetSeqLogSuite extends SparkFunSuite with SharedSQLContext { + + private implicit val formats = Serialization.formats(NoTypeHints) + + testWithUninterruptibleThread("OffsetSeqLogSuite: basic") { +withTempDir { temp => + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir + val metadataLog = new OffsetSeqLog("v1", spark, dir.getAbsolutePath) + val batch0 = OffsetSeq.fill(LongOffset(0)) + val batch1 = OffsetSeq.fill(LongOffset(1), LongOffset(2)) + + val batch0Serialized = OffsetSeq.fill(batch0.offsets.map(_.map(o => +SerializedOffset(o.json))).flatten: _*) + + val batch1Serialized = OffsetSeq.fill(batch1.offsets.map(_.map(o => --- End diff -- indeed! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85609921 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala --- @@ -37,14 +39,19 @@ import org.apache.spark.sql.SparkSession * compact log files every 10 batches by default into a big file. When * doing a compaction, it will read all old log files and merge them with the new batch. */ -abstract class CompactibleFileStreamLog[T: ClassTag]( +abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( --- End diff -- It's required by json4s that T be an AnyRef. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85609758 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceOffsetSuite.scala --- @@ -36,4 +37,9 @@ class KafkaSourceOffsetSuite extends OffsetSuite { compare( one = KafkaSourceOffset(("t", 0, 1L)), two = KafkaSourceOffset(("t", 0, 2L), ("t", 1, 1L))) + + compare( --- End diff -- I have extended the KafkaSourceOffsetSuite to include more thorough evaluations of json serialization/deserialization, including to/from OffsetSeqLog. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on the issue: https://github.com/apache/spark/pull/15626 @koeninger Thanks for pointing this out. I have revised to serialization procedure to use org.apache.spark.sql.kafka010.JsonUtils --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85026112 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala --- @@ -23,4 +23,16 @@ package org.apache.spark.sql.execution.streaming * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no * new data has arrived. */ -trait Offset extends Serializable {} +abstract class Offset { + def json: String +} + +/** Used when loading */ +class SerializedOffset(override val json: String) extends Offset --- End diff -- Makes sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85026079 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala --- @@ -23,4 +23,16 @@ package org.apache.spark.sql.execution.streaming * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no * new data has arrived. */ -trait Offset extends Serializable {} +abstract class Offset { + def json: String +} + +/** Used when loading */ --- End diff -- Got it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85025740 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala --- @@ -22,8 +22,18 @@ package org.apache.spark.sql.execution.streaming */ case class LongOffset(offset: Long) extends Offset { + override val json = offset.toString + def +(increment: Long): LongOffset = new LongOffset(offset + increment) def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) override def toString: String = s"#$offset" } + +object LongOffset { + + def apply(serialized: Offset) : LongOffset = new LongOffset(serialized.json.toLong) + +} --- End diff -- Can you please clarify best practices w.r.t. extra lines? I didn't see anything in the code style guide. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85025208 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala --- @@ -51,4 +60,8 @@ private[kafka010] object KafkaSourceOffset { def apply(offsetTuples: (String, Int, Long)*): KafkaSourceOffset = { KafkaSourceOffset(offsetTuples.map { case(t, p, o) => (new TopicPartition(t, p), o) }.toMap) } + + def apply(serialized: Offset): KafkaSourceOffset = { --- End diff -- Sounds good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85025029 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala --- @@ -27,13 +28,21 @@ import org.apache.spark.sql.execution.streaming.Offset */ private[kafka010] case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset { + import org.json4s.jackson.Serialization.{write} --- End diff -- Sounds good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
Github user tcondie commented on a diff in the pull request: https://github.com/apache/spark/pull/15626#discussion_r85024877 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala --- @@ -22,7 +22,10 @@ package org.apache.spark.sql.execution.streaming * [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance * vector clock that must progress linearly forward. */ -case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset { +case class OffsetSeq(offsets: Seq[Option[Offset]]) { --- End diff -- I renamed to avoid confusing the class as being a type of Offset. Does that make sense or should I take an alternative approach? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15626: SPARK-17829 [SQL] Stable format for offset log
GitHub user tcondie opened a pull request: https://github.com/apache/spark/pull/15626 SPARK-17829 [SQL] Stable format for offset log ## What changes were proposed in this pull request? Currently we use java serialization for the WAL that stores the offsets contained in each batch. This has two main issues: It can break across spark releases (though this is not the only thing preventing us from upgrading a running query) It is unnecessarily opaque to the user. I'd propose we require offsets to provide a user readable serialization and use that instead. JSON is probably a good option. ## How was this patch tested? Tests were added for KafkaOffset in KafkaOffsetSuite and for LongOffset in OffsetSuite. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. @zsxwing @marmbrus You can merge this pull request into a Git repository by running: $ git pull https://github.com/tcondie/spark spark-8360 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15626.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15626 commit 60a71fae0decaebaa4f869b78283295b6491992a Author: Tyson Condie Date: 2016-10-21T23:53:34Z initial version of offse json serialization commit 52431141ce4c7efbf82da5a204ba77d16c03f16d Author: Tyson Condie Date: 2016-10-22T00:26:51Z remove CompositeOffsetSuite commit 3ccdc5c00b7043570a3567560f1f9ffaeb1ba688 Author: Tyson Condie Date: 2016-10-24T22:54:53Z update commit 16c6cea8f91e16dc39c2a0796295f233d33054c4 Author: Tyson Condie Date: 2016-10-24T23:00:38Z update test parameters to avoid test name conflict --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org