[GitHub] spark issue #19435: [MINOR][SS] "keyWithIndexToNumValues" -> "keyWithIndexTo...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/19435 @tdas @zsxwing would you take a look, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19435: [WIP][SS][MINOR] "keyWithIndexToNumValues" -> "keyWithIn...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/19435 @tdas would you take a look, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19435: [WIP][SS][MINOR] "keyWithIndexToNumValues" -> "ke...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/19435#discussion_r142836474 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -291,7 +291,7 @@ class SymmetricHashJoinStateManager( } /** A wrapper around a [[StateStore]] that stores [(key, index) -> value]. */ - private class KeyWithIndexToValueStore extends StateStoreHandler(KeyWithIndexToValuesType) { + private class KeyWithIndexToValueStore extends StateStoreHandler(KeyWithIndexToValueType) { --- End diff -- dropped the 's' from '...Values...' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19435: [WIP][SS][MINOR] "keyWithIndexToNumValues" -> "ke...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/19435 [WIP][SS][MINOR] "keyWithIndexToNumValues" -> "keyWithIndexToValue" ## What changes were proposed in this pull request? This PR changes `keyWithIndexToNumValues` to `keyWithIndexToValue`. There will be folders named with this `keyWithIndexToNumValues`. So if we ever want to fix this, let's fix it now. ## How was this patch tested? existing unit test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark keyWithIndex Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19435.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19435 commit c62ba65fcc952de4b244695f32dc41783a2c1631 Author: Liwei Lin <lwl...@gmail.com> Date: 2017-10-05T02:21:02Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19206: [SPARK-19206][yarn]Client and ApplicationMaster resolveP...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/19206 It says `Spark-19206` in your PR title but Spark-19206 is actually about `Update outdated parameter descriptions in external-kafka module`; so maybe you should reference a different JIRA. That's all what Sean and Marcelo meant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18342: [Spark-21123][Docs][Structured Streaming] Options for fi...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/18342 This lgtm; @zsxwing please also take a look --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 thank you @zsxwing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 Comments have beed addressed -- @zsxwing it'd be great if you could take another look --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468906 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -36,20 +37,27 @@ import org.apache.spark.util.SerializableConfiguration * A [[FileIndex]] that generates the list of files to process by recursively listing all the * files present in `paths`. * - * @param rootPaths the list of root table paths to scan + * @param rootPathsSpecified the list of root table paths to scan (some of which might be + * filtered out later) * @param parameters as set of options to control discovery * @param partitionSchema an optional partition schema that will be use to provide types for the *discovered partitions */ class InMemoryFileIndex( sparkSession: SparkSession, -override val rootPaths: Seq[Path], +rootPathsSpecified: Seq[Path], parameters: Map[String, String], partitionSchema: Option[StructType], fileStatusCache: FileStatusCache = NoopCache) extends PartitioningAwareFileIndex( sparkSession, parameters, partitionSchema, fileStatusCache) { + // Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir) + // or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain + // such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath" + // is the output of a streaming query. + override val rootPaths = rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory) --- End diff -- Yea your are quite correct! They will be filted by `InMemoryFileIndex.shouldFilterOut`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468833 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala --- @@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest { } } + test("partitioned writing and batch reading with 'basePath'") { +val inputData = MemoryStream[Int] +val ds = inputData.toDS() + +val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala --- @@ -53,6 +53,29 @@ object FileStreamSink extends Logging { case _ => false } } + + /** + * Returns true if the path is the metadata dir or its ancestor is the metadata dir. + * E.g.: + * - ancestorIsMetadataDirectory(/.../_spark_metadata) => true + * - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true + * - ancestorIsMetadataDirectory(/a/b/c) => false + */ + def ancestorIsMetadataDirectory(path: Path): Boolean = { +require(path.isAbsolute, s"$path is required to be absolute") --- End diff -- switched to `makeQualified` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17346#discussion_r114468821 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala --- @@ -53,6 +53,29 @@ object FileStreamSink extends Logging { case _ => false } } + + /** + * Returns true if the path is the metadata dir or its ancestor is the metadata dir. + * E.g.: + * - ancestorIsMetadataDirectory(/.../_spark_metadata) => true + * - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true + * - ancestorIsMetadataDirectory(/a/b/c) => false + */ + def ancestorIsMetadataDirectory(path: Path): Boolean = { +require(path.isAbsolute, s"$path is required to be absolute") +var currentPath = path +var finished = false +while (!finished) { --- End diff -- fixed. good point! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17735: [SPARK-20441][SPARK-20432][SS] Within the same st...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17735#discussion_r114453379 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -120,6 +141,32 @@ class StreamSuite extends StreamTest { assertDF(df) } + test("Within the same streaming query, one StreamingRelation should only be transformed to one " + +"StreamingExecutionRelation") { +val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() +var query: StreamExecution = null +try { + query = +df.union(df) + .writeStream + .format("memory") + .queryName("memory") + .start() + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery + val executionRelations = +query + .logicalPlan --- End diff -- ah, i see. fixed. thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17735 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 @zsxwing would you take a look at your convenience? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17735 @zsxwing @brkyvz would you take a look at this? thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17735 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17735 @zsxwing @brkyvz would you take a look at this? thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17812: [WIP][SPARK-][SS] Batch queries with 'Dataset/Dat...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/17812 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17812: [WIP][SPARK-][SS] Batch queries with 'Dataset/Dat...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17812 [WIP][SPARK-][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark batch-watermark Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17812.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17812 commit 872f4f41ffe8ee2503a0879591cf74e90803c53e Author: Liwei Lin <lwl...@gmail.com> Date: 2017-04-24T03:20:10Z Fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17735 @brkyvz please take a another look --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17735 @zsxwing @brkyvz would you take a look at this? thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 Rebased to master to resolve conflicts --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17735: [WIP][SS] Within the same streaming query, one St...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17735 [WIP][SS] Within the same streaming query, one StreamingRelation should only be transformed to one StreamingExecutionRelation ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark SPARK-20441 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17735.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17735 commit 018b5fd72eeb528fbae9be367db18a61c4e5f129 Author: Liwei Lin <lwl...@gmail.com> Date: 2017-04-23T07:24:51Z Add tests commit bf502a75924f5a9586b221b67a10536da8e51e0d Author: Liwei Lin <lwl...@gmail.com> Date: 2017-04-23T10:03:28Z Fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17268: [SPARK-19932][SS] Disallow a case that might caus...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/17268 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17268: [SPARK-19932][SS] Disallow a case that might cause OOM f...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17268 Thanks for the comments! Closing this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17346 @zsxwing would you take a look at this? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17346: [WIP] DataFrame batch reader may fail to infer pa...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17346 [WIP] DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output WIP of SPARK-19965 ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark filter-metadata Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17346.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17346 commit 19d0d485f0190cc8b1df8bec3a9f3b56bca3883e Author: Liwei Lin <lwl...@gmail.com> Date: 2017-03-19T09:43:33Z Initial commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17219: [SPARK-19876][SS][WIP] OneTime Trigger Executor
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17219#discussion_r106800936 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.io.{InputStream, OutputStream} +import java.nio.charset.StandardCharsets._ + +import scala.io.{Source => IOSource} + +import org.apache.spark.sql.SparkSession + +/** + * Used to write log files that represent batch commit points in structured streaming. + * A commit log file will be written immediately after the successful completion of a + * batch, and before processing the next batch. Here is an execution summary: + * - trigger batch 1 + * - obtain batch 1 offsets and write to offset log + * - process batch 1 + * - write batch 1 to completion log + * - trigger batch 2 + * - obtain bactch 2 offsets and write to offset log + * - process batch 2 + * - write batch 2 to completion log + * + * + * The current format of the batch completion log is: + * line 1: version + * line 2: metadata (optional json string) + */ +class BatchCommitLog(sparkSession: SparkSession, path: String) + extends HDFSMetadataLog[String](sparkSession, path) { + + override protected def deserialize(in: InputStream): String = { +// called inside a try-finally where the underlying stream is closed in the caller +val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() +if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") +} +val version = lines.next().trim.toInt +if (BatchCommitLog.VERSION < version) { + throw new IllegalStateException(s"Incompatible log file version ${version}") +} --- End diff -- nit: now that https://github.com/apache/spark/pull/17070 has been merged, maybe let's use `parseVersion(lines.next(), BatchCommitLog.VERSION)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17327: [SPARK-19721][SS][BRANCH-2.1] Good error message for ver...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17327 Thanks! Closed this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17327: [SPARK-19721][SS][BRANCH-2.1] Good error message ...
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/17327 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17070 @zsxwing sure, please see https://github.com/apache/spark/pull/17327 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17327: [SPARK-19721][SS][BRANCH-2.1] Good error message ...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17327 [SPARK-19721][SS][BRANCH-2.1] Good error message for version mismatch in log files ## Problem There are several places where we write out version identifiers in various logs for structured streaming (usually `v1`). However, in the places where we check for this, we throw a confusing error message. ## What changes were proposed in this pull request? This patch made two major changes: 1. added a `parseVersion(...)` method, and based on this method, fixed the following places the way they did version checking (no other place needed to do this checking): ``` HDFSMetadataLog - CompactibleFileStreamLog > fixed with this patch - FileStreamSourceLog ---> inherited the fix of `CompactibleFileStreamLog` - FileStreamSinkLog -> inherited the fix of `CompactibleFileStreamLog` - OffsetSeqLog > fixed with this patch - anonymous subclass in KafkaSource ---> fixed with this patch ``` 2. changed the type of `FileStreamSinkLog.VERSION`, `FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can identify newer versions via `version > 1` instead of `version != "v1"` - note this didn't break any backwards compatibility -- we are still writing out `"v1"` and reading back `"v1"` ## Exception message with this patch ``` java.lang.IllegalStateException: Failed to read log file /private/var/folders/nn/82rmvkk568sd8p3p8tb33trwgn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0. UnsupportedLogVersion: maximum supported log version is v1, but encountered v99. The log file was produced by a newer version of Spark and cannot be read by this version. Please upgrade. at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) ``` ## How was this patch tested? unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark good-msg-2.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17327.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17327 commit daabb27aa32cb19c157e19081f6d08ff368bb42b Author: Liwei Lin <lwl...@gmail.com> Date: 2017-02-25T03:46:35Z Fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17268: [SPARK-19932][SS] Disallow a case that might case OOM fo...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17268 Sure; sorry I didn't say it out but I meant the same thing :-) @marmbrus now that I've updated this as well as the JIRA, would you mind taking another look? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [SPARK-19721][SS] Good error message for version ...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17070#discussion_r106352628 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -195,6 +195,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: val input = fileManager.open(batchMetadataFile) try { Some(deserialize(input)) + } catch { +case ise: IllegalStateException => + // re-throw the exception with the log file path added + throw new IllegalStateException( +s"Failed to read log file $batchMetadataFile. ${ise.getMessage}") --- End diff -- done; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17268: [SPARK-19932][SS] Also save event time into StateStore f...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17268 Thank you @marmbrus for the detailed explanation! > For that reason, I think its safest to require the user to explicitly include the timestamp. Yea, let me update this in this direction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17299: [SPARK-19817][SS] Make it clear that `timeZone` is a gen...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17299 This is the fix for the streaming counter-part (i.e. Structured Streaming), @ueshin @gatorsmile would you take a look? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17299: [SPARK-19817][SS] Make it clear that `timeZone` i...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17299 [SPARK-19817][SS] Make it clear that `timeZone` is a general option in DataStreamReader/Writer ## What changes were proposed in this pull request? As timezone setting can also affect partition values, it works for all formats, we should make it clear. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark timezone Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17299.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17299 commit 0b13c0850472b5172a9f428f923b99a168a79e00 Author: Liwei Lin <lwl...@gmail.com> Date: 2017-03-15T02:06:06Z Initial commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17268: [SPARK-19932][SS] Also save event time into StateStore f...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17268 @marmbrus thanks for the comments. > In the worst case... it is possible that the result actually ends up with duplicates in it. Ah, could you elaborate? I'm not sure why there might be duplicates, since this patch proposes to include the timestamp column implicitly in the state store values rather than in the state store keys. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17268: [SPARK-19932][SS] Also save event time into StateStore f...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17268 @marmbrus @zsxwing would you take a look at this, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17268: [WIP][SPARK][SS] Also save event time into StateS...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17268 [WIP][SPARK][SS] Also save event time into StateStore for certain cases ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark dedup-watermark Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17268.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17268 commit 1cdf29af06418fe6dea6ed9d4b85be62c8b6edcd Author: Liwei Lin <lwl...@gmail.com> Date: 2017-03-12T12:28:54Z Initial commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17070 @zsxwing would you take a look when you've got a minute? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17120 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105097597 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -79,9 +81,16 @@ class FileStreamSource( sourceOptions.maxFileAgeMs } + private val fileNameOnly = sourceOptions.fileNameOnly + if (fileNameOnly) { +logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " + + "UUID), otherwise, files using the same name will be considered as the same file and causes" + + " data lost") --- End diff -- fixed -- thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17216#discussion_r105081023 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala --- @@ -71,7 +71,10 @@ object OffsetSeq { * @param batchTimestampMs: The current batch processing timestamp. * Time unit: milliseconds */ -case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var batchTimestampMs: Long = 0) { +case class OffsetSeqMetadata( +var batchWatermarkMs: Long = 0, +var batchTimestampMs: Long = 0, +var numShufflePartitions: Int = 0) { --- End diff -- Hi @kunalkhamar, in case you would update `OffsetSeq`'s log version number, the work being done in #17070 might be helpful --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105080626 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -309,6 +315,10 @@ object FileStreamSource { def size: Int = map.size() +/** + * Note when `fileNameOnly` is true, each entry would be (file name, timestamp) rather than + * (full path, timestamp). + */ def allEntries: Seq[(String, Timestamp)] = { --- End diff -- deleted :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105080572 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -75,7 +77,7 @@ class FileStreamSource( /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) + val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs, sourceOptions.fileNameOnly) --- End diff -- added. thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17120 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17070 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r104287612 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -1253,8 +1253,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(map.isNewFile("e", 20)) } + test("SeenFilesMap with fileNameOnly = true") { +val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true) + +map.add("file:///a/b/c/d", 5) +map.add("file:///a/b/c/e", 5) +assert(map.size == 2) --- End diff -- ah, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17120 Thank you @marmbrus @steveloughran for the feedback. Added some explicit docs. Here's a screenshot of the affected section from the programming guide: ![snip20170304_5](https://cloud.githubusercontent.com/assets/15843379/23575433/06b18bf2-00c7-11e7-9a44-9205fab5d6bc.png) Please take a look again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r104276335 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark. Append path: path to the output directory, must be specified. + maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) -latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files(default: false) - +latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files (default: false) + +fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: + +· "file:///dataset.txt" +· "s3://a/dataset.txt" +· "s3n://a/b/dataset.txt" +· "s3a://a/b/c/dataset.txt" --- End diff -- the incidents of a `` does not look pretty, so I'm using a dot here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17120 Thank you @marmbrus @steveloughran for the feedback. I've added some explicit docs. Here's a screenshot of the affected section from the programming guide: ![snip20170304_4](https://cloud.githubusercontent.com/assets/15843379/23575388/18ce95ba-00c6-11e7-8624-3fcb74a31d8a.png) Please take a look again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r104276118 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark. Append path: path to the output directory, must be specified. + maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files(default: false) - + +fileNameOnly: whether to check new files based on only the filename instead of on the full path. With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: + +· "file:///dataset.txt" +· "s3://a/dataset.txt" +· "s3n://a/b/dataset.txt" +· "s3a://a/b/c/dataset.txt" --- End diff -- the incidents of a `` does not look pretty, so I'm using a dot here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16970: [SPARK-19497][SS]Implement streaming deduplication
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/16970 @uncleGen I think `requiredChildDistribution = ClusteredDistribution(keyExpressions) :: Nil` (please see [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L344-L345)) takes care of it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17070 @zsxwing would you take a look when you've got a minute? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17120 @steveloughran thanks for the comments. @marmbrus @zsxwing it'd be great if you could share some thoughts! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17120 [SPARK-19715][Structured Streaming] Option to Strip Paths in FileSource ## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this cause cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark filename-only Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17120.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17120 commit aeb10d100a24ca644745fb8b26985b584fd5118e Author: Liwei Lin <lwl...@gmail.com> Date: 2017-02-28T15:29:17Z Add support for `fileNameOnly` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103604050 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -190,32 +190,31 @@ class FileStreamSource( val startTime = System.nanoTime var allFiles: Seq[FileStatus] = null -if (sourceHasMetadata.isEmpty) { - if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) { -sourceHasMetadata = Some(true) -allFiles = allFilesUsingMetadataLogFileIndex() - } else { -allFiles = allFilesUsingInMemoryFileIndex() -if (allFiles.isEmpty) { - // we still cannot decide +sourceHasMetadata match { --- End diff -- simply switched to `sourceHasMetadata match { case... case ... case ...}` actual diff is quite small --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603945 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +// q1 is a streaming query that reads from memory and writes to text files +val q1_source = MemoryStream[String] +val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath +val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath +val q1 = + q1_source +.toDF() +.writeStream +.option("checkpointLocation", q1_checkpointDir) +.format("text") +.start(q1_outputDir) + +// q2 is a streaming query that reads q1's text outputs +val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + +def q1AddData(data: String*): StreamAction = + Execute { _ => +q1_source.addData(data) +q1.processAllAvailable() + } +def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + +testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { +// create a text file that won't be on q1's sink log +// thus even if its contents contains "keep", it should NOT appear in q2's answer +val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") +stringToFile(shouldNotKeep, "should_not_keep!!!") +shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { +// compact interval is 3, so file "2.compact" should exist +new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + // stop q1 manually + Execute { _ => q1.stop() } +) + } +} + } + + test("read partitioned data from outputs of another streaming query") { +withTempDirs { case (dir, tmp) => + // q1 is a streaming query that reads from memory and writes to partitioned json files + val q1_source = MemoryStream[(String, String)] + val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath + val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath + val q1 = +q1_source + .toDF() + .select($"_1" as "partition", $"_2" as "value") + .writeStream + .option("checkpointLocation", q1_checkpointDir) + .partitionBy("partition") + .format("json") + .start(q1_outputDir) + + // q2 is a streaming query that reads q1's partitioned json outputs + val schema = new StructType().add("value", StringType).add("partition", StringType) + val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep") + + def q1AddData(data: (String, String)*): StreamAction = +Execute { _ => + q1_source.addData(data) + q1.processAllAvailable() +} + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( +// batch 0: append to a new partition=foo, should read value and partition +q1AddData(("foo", "drop1"), ("foo", "keep2")), +q2ProcessAllAvailable(), +CheckAnswer(("keep2", "foo")), + +// batch 1: append to same partit
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603935 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +// q1 is a streaming query that reads from memory and writes to text files +val q1_source = MemoryStream[String] +val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath +val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath +val q1 = + q1_source +.toDF() +.writeStream +.option("checkpointLocation", q1_checkpointDir) +.format("text") +.start(q1_outputDir) + +// q2 is a streaming query that reads q1's text outputs +val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + +def q1AddData(data: String*): StreamAction = + Execute { _ => +q1_source.addData(data) +q1.processAllAvailable() + } +def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + +testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { +// create a text file that won't be on q1's sink log +// thus even if its contents contains "keep", it should NOT appear in q2's answer +val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") +stringToFile(shouldNotKeep, "should_not_keep!!!") +shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { +// compact interval is 3, so file "2.compact" should exist +new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + // stop q1 manually + Execute { _ => q1.stop() } +) + } +} + } + + test("read partitioned data from outputs of another streaming query") { +withTempDirs { case (dir, tmp) => + // q1 is a streaming query that reads from memory and writes to partitioned json files + val q1_source = MemoryStream[(String, String)] + val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath + val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath + val q1 = +q1_source + .toDF() + .select($"_1" as "partition", $"_2" as "value") + .writeStream + .option("checkpointLocation", q1_checkpointDir) + .partitionBy("partition") + .format("json") + .start(q1_outputDir) + + // q2 is a streaming query that reads q1's partitioned json outputs + val schema = new StructType().add("value", StringType).add("partition", StringType) + val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep") + + def q1AddData(data: (String, String)*): StreamAction = +Execute { _ => + q1_source.addData(data) + q1.processAllAvailable() +} + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( +// batch 0: append to a new partition=foo, should read value and partition +q1AddData(("foo", "drop1"), ("foo", "keep2")), +q2ProcessAllAvailable(), +CheckAnswer(("keep2", "foo")), + +// batch 1: append to same partit
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603942 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +// q1 is a streaming query that reads from memory and writes to text files +val q1_source = MemoryStream[String] +val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath +val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath +val q1 = + q1_source +.toDF() +.writeStream +.option("checkpointLocation", q1_checkpointDir) +.format("text") +.start(q1_outputDir) + +// q2 is a streaming query that reads q1's text outputs +val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + +def q1AddData(data: String*): StreamAction = + Execute { _ => +q1_source.addData(data) +q1.processAllAvailable() + } +def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + +testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { +// create a text file that won't be on q1's sink log +// thus even if its contents contains "keep", it should NOT appear in q2's answer +val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") +stringToFile(shouldNotKeep, "should_not_keep!!!") +shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { +// compact interval is 3, so file "2.compact" should exist +new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + // stop q1 manually + Execute { _ => q1.stop() } +) + } +} + } + + test("read partitioned data from outputs of another streaming query") { +withTempDirs { case (dir, tmp) => + // q1 is a streaming query that reads from memory and writes to partitioned json files + val q1_source = MemoryStream[(String, String)] + val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath + val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath + val q1 = +q1_source + .toDF() + .select($"_1" as "partition", $"_2" as "value") + .writeStream + .option("checkpointLocation", q1_checkpointDir) + .partitionBy("partition") + .format("json") + .start(q1_outputDir) + + // q2 is a streaming query that reads q1's partitioned json outputs + val schema = new StructType().add("value", StringType).add("partition", StringType) + val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep") + + def q1AddData(data: (String, String)*): StreamAction = +Execute { _ => + q1_source.addData(data) + q1.processAllAvailable() +} + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( +// batch 0: append to a new partition=foo, should read value and partition +q1AddData(("foo", "drop1"), ("foo", "keep2")), +q2ProcessAllAvailable(), +CheckAnswer(("keep2", "foo")), + +// batch 1: append to same partit
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603922 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +// q1 is a streaming query that reads from memory and writes to text files +val q1_source = MemoryStream[String] +val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath +val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath +val q1 = + q1_source +.toDF() +.writeStream +.option("checkpointLocation", q1_checkpointDir) +.format("text") +.start(q1_outputDir) + +// q2 is a streaming query that reads q1's text outputs +val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + +def q1AddData(data: String*): StreamAction = + Execute { _ => +q1_source.addData(data) +q1.processAllAvailable() + } +def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + +testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { +// create a text file that won't be on q1's sink log +// thus even if its contents contains "keep", it should NOT appear in q2's answer +val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") +stringToFile(shouldNotKeep, "should_not_keep!!!") +shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { +// compact interval is 3, so file "2.compact" should exist +new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + // stop q1 manually + Execute { _ => q1.stop() } +) + } +} + } + + test("read partitioned data from outputs of another streaming query") { +withTempDirs { case (dir, tmp) => + // q1 is a streaming query that reads from memory and writes to partitioned json files + val q1_source = MemoryStream[(String, String)] + val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath + val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath + val q1 = +q1_source + .toDF() + .select($"_1" as "partition", $"_2" as "value") + .writeStream + .option("checkpointLocation", q1_checkpointDir) + .partitionBy("partition") + .format("json") + .start(q1_outputDir) + + // q2 is a streaming query that reads q1's partitioned json outputs + val schema = new StructType().add("value", StringType).add("partition", StringType) + val q2 = createFileStream("json", q1_outputDir, Some(schema)).filter($"value" contains "keep") + + def q1AddData(data: (String, String)*): StreamAction = +Execute { _ => + q1_source.addData(data) + q1.processAllAvailable() +} + def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + + testStream(q2)( +// batch 0: append to a new partition=foo, should read value and partition +q1AddData(("foo", "drop1"), ("foo", "keep2")), +q2ProcessAllAvailable(), +CheckAnswer(("keep2", "foo")), + +// batch 1: append to same partit
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603898 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +// q1 is a streaming query that reads from memory and writes to text files +val q1_source = MemoryStream[String] +val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath +val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath +val q1 = + q1_source +.toDF() +.writeStream +.option("checkpointLocation", q1_checkpointDir) +.format("text") +.start(q1_outputDir) + +// q2 is a streaming query that reads q1's text outputs +val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + +def q1AddData(data: String*): StreamAction = + Execute { _ => +q1_source.addData(data) +q1.processAllAvailable() + } +def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + +testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { +// create a text file that won't be on q1's sink log +// thus even if its contents contains "keep", it should NOT appear in q2's answer +val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") +stringToFile(shouldNotKeep, "should_not_keep!!!") +shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { +// compact interval is 3, so file "2.compact" should exist +new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + // stop q1 manually + Execute { _ => q1.stop() } +) + } +} + } + + test("read partitioned data from outputs of another streaming query") { --- End diff -- test removed -- Let me think about this write partition infommation thing :) thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603705 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +// q1 is a streaming query that reads from memory and writes to text files +val q1_source = MemoryStream[String] +val q1_checkpointDir = new File(dir, "q1_checkpointDir").getCanonicalPath +val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath +val q1 = + q1_source +.toDF() +.writeStream +.option("checkpointLocation", q1_checkpointDir) +.format("text") +.start(q1_outputDir) + +// q2 is a streaming query that reads q1's text outputs +val q2 = createFileStream("text", q1_outputDir).filter($"value" contains "keep") + +def q1AddData(data: String*): StreamAction = + Execute { _ => +q1_source.addData(data) +q1.processAllAvailable() + } +def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() } + +testStream(q2)( + // batch 0 + q1AddData("drop1", "keep2"), + q2ProcessAllAvailable(), + CheckAnswer("keep2"), + + // batch 1 + Assert { +// create a text file that won't be on q1's sink log +// thus even if its contents contains "keep", it should NOT appear in q2's answer +val shouldNotKeep = new File(q1_outputDir, "should_not_keep.txt") +stringToFile(shouldNotKeep, "should_not_keep!!!") +shouldNotKeep.exists() + }, + q1AddData("keep3"), + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3"), + + // batch 2: check that things work well when the sink log gets compacted + q1AddData("keep4"), + Assert { +// compact interval is 3, so file "2.compact" should exist +new File(q1_outputDir, s"${FileStreamSink.metadataDir}/2.compact").exists() + }, + q2ProcessAllAvailable(), + CheckAnswer("keep2", "keep3", "keep4"), + + // stop q1 manually --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603671 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -208,6 +208,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } + /** Execute arbitrary code */ + case class Execute(val func: StreamExecution => Any) extends StreamAction { --- End diff -- fixed, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603693 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +// q1 is a streaming query that reads from memory and writes to text files +val q1_source = MemoryStream[String] --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103603687 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +665,154 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => --- End diff -- fixed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103405331 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -52,10 +52,7 @@ abstract class FileStreamSourceTest query.nonEmpty, "Cannot add data when there is no query for finding the active file stream source") - val sources = query.get.logicalPlan.collect { -case StreamingExecutionRelation(source, _) if source.isInstanceOf[FileStreamSource] => - source.asInstanceOf[FileStreamSource] - } --- End diff -- this common logic is extracted out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103404962 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -159,28 +161,64 @@ class FileStreamSource( /** * If the source has a metadata log indicating which files should be read, then we should use it. - * We figure out whether there exists some metadata log only when user gives a non-glob path. + * Only when user gives a non-glob path that will we figure out whether the source has some + * metadata log + * + * Nonemeans we don't know at the moment + * Some(true) means we know for sure the source DOES have metadata + * Some(false) means we know for sure the source DOSE NOT have metadata */ - private val sourceHasMetadata: Boolean = -!SparkHadoopUtil.get.isGlobPath(new Path(path)) && - FileStreamSink.hasMetadata(Seq(path), sparkSession.sessionState.newHadoopConf()) + @volatile private[sql] var sourceHasMetadata: Option[Boolean] = +if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None + + private def allFilesUsingInMemoryFileIndex() = { +val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) +val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) +fileIndex.allFiles() + } + + private def allFilesUsingMetadataLogFileIndex() = { +// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a +// non-glob path +new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() + } /** * Returns a list of files found, sorted by their timestamp. */ private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime -val catalog = - if (sourceHasMetadata) { -// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a -// non-glob path -new MetadataLogFileIndex(sparkSession, qualifiedBasePath) + --- End diff -- seems like `sourceHasMetadata match { case ... }` is more appropriate here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103404486 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -159,28 +161,64 @@ class FileStreamSource( /** * If the source has a metadata log indicating which files should be read, then we should use it. - * We figure out whether there exists some metadata log only when user gives a non-glob path. + * Only when user gives a non-glob path that will we figure out whether the source has some + * metadata log + * + * Nonemeans we don't know at the moment + * Some(true) means we know for sure the source DOES have metadata + * Some(false) means we know for sure the source DOSE NOT have metadata */ - private val sourceHasMetadata: Boolean = -!SparkHadoopUtil.get.isGlobPath(new Path(path)) && - FileStreamSink.hasMetadata(Seq(path), sparkSession.sessionState.newHadoopConf()) + @volatile private[sql] var sourceHasMetadata: Option[Boolean] = +if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None + + private def allFilesUsingInMemoryFileIndex() = { +val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) +val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) +fileIndex.allFiles() + } + + private def allFilesUsingMetadataLogFileIndex() = { +// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a +// non-glob path +new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles() + } /** * Returns a list of files found, sorted by their timestamp. */ private def fetchAllFiles(): Seq[(String, Long)] = { val startTime = System.nanoTime -val catalog = - if (sourceHasMetadata) { -// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a -// non-glob path -new MetadataLogFileIndex(sparkSession, qualifiedBasePath) + --- End diff -- then based on `sourceHasMetadata`'s value, we can choose which `FileIndex` should be used. As showed below, `case None` should be dealt with most care. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103403986 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -159,28 +161,64 @@ class FileStreamSource( /** * If the source has a metadata log indicating which files should be read, then we should use it. - * We figure out whether there exists some metadata log only when user gives a non-glob path. + * Only when user gives a non-glob path that will we figure out whether the source has some + * metadata log + * + * Nonemeans we don't know at the moment + * Some(true) means we know for sure the source DOES have metadata + * Some(false) means we know for sure the source DOSE NOT have metadata --- End diff -- ( some notes here since the changes are not trival ) here we're using this `sourceHasMetadata` to indicate whether we know for sure the source has metadata, as stated in the source file comments: - `None`means we don't know at the moment - `Some(true)` means we know for sure the source DOES have metadata - `Some(false)` means we know for sure the source DOSE NOT have metadata --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103366158 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -158,12 +158,28 @@ class FileStreamSource( } /** + * If the source has a metadata log indicating which files should be read, then we should use it. + * We figure out whether there exists some metadata log only when user gives a non-glob path. + */ + private val sourceHasMetadata: Boolean = --- End diff -- and add a dedicated test case of course --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103366082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -158,12 +158,28 @@ class FileStreamSource( } /** + * If the source has a metadata log indicating which files should be read, then we should use it. + * We figure out whether there exists some metadata log only when user gives a non-glob path. + */ + private val sourceHasMetadata: Boolean = --- End diff -- ah thanks! I was about to change it to a method which would stop detecting once we know for sure to use a metadatafileindex or a inmemoryfileindex and save up this information. will udpate with this code soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/16987 Rebased to master and tests updated. @zsxwing would you take another look when you've got a minute? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103104036 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +663,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + testWithUninterruptibleThread("read data from outputs of another streaming query") { +withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") { + withTempDirs { case (dir, tmp) => +val sinkLogDir = new File(dir, FileStreamSink.metadataDir).getCanonicalPath +val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, sinkLogDir) + +val fileStream = createFileStream("text", dir.getCanonicalPath) +val filtered = fileStream.filter($"value" contains "keep") + +def addIntoSinkLog(batch: Int, fileName: String): Boolean = { + val unqualifiedDirPath = new Path(new File(dir, fileName).getCanonicalPath) + val fs = unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration) + val sinkFileStatus = SinkFileStatus(fs.getFileStatus(unqualifiedDirPath)) + sinkLog.add(batch, Array(sinkFileStatus)) +} + +testStream(filtered)( + // Create new dir and write to it, should read + AddTextFileData("drop1\nkeep2", dir, tmp, Some("file_1")), + Assert { addIntoSinkLog(0, "file_1") }, + CheckAnswer("keep2"), + + // Create "file_2" but DO NOT add it to the log intentionally + AddTextFileData("should_not_keep!!!", dir, tmp, Some("file_2")), + Assert { new File(dir, "file_2").exists() }, + AddTextFileData("keep3", dir, tmp, Some("file_3")), + Assert { addIntoSinkLog(1, "file_3") }, + // Should NOT read "file_2"; should read "file_3" + CheckAnswer("keep2", "keep3"), + + // Check that things work well when the sink log gets compacted + AddTextFileData("keep4", dir, tmp, Some("file_4")), + Assert { addIntoSinkLog(2, "file_4") }, + Assert { +// compact interval is 3, so file "2.compact" should exist +new File(sinkLogDir, "2" + CompactibleFileStreamLog.COMPACT_FILE_SUFFIX).exists() + }, + CheckAnswer("keep2", "keep3", "keep4"), + + AddTextFileData("keep5", dir, tmp, Some("file_5")), + Assert { addIntoSinkLog(3, "file_5") }, + CheckAnswer("keep2", "keep3", "keep4", "keep5") +) + } +} + } + + testWithUninterruptibleThread("read partitioned data from outputs of another streaming query") { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103104024 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -662,6 +663,101 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + testWithUninterruptibleThread("read data from outputs of another streaming query") { --- End diff -- done; thanks! and good job for https://github.com/apache/spark/pull/16947! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r103104003 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -243,13 +243,20 @@ case class DataSource( val path = caseInsensitiveOptions.getOrElse("path", { throw new IllegalArgumentException("'path' is not specified") }) +// If we're reading files from outputs of another streaming query, then it does not make +// sense to glob files since we would get files from the metadata log. +// Thus we would figure out whether there exists some metadata log only when user gives a +// non-glob path. +val sourceHasMetadata = + !SparkHadoopUtil.get.isGlobPath(new Path(path)) && hasMetadata(Seq(path)) --- End diff -- Yea `hasMetadata` was the reason! Now it lives in `object FileStreamSink` :-D --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17070: [WIP][SS] Good error message for version mismatch in log...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/17070 @srowen thank you for the comments! I was trying to tackle SPARK-19721, sorry the summary just said "WIP" without a JIRA number; adding JIRA number back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17070#discussion_r103102481 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -195,6 +196,11 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: val input = fileManager.open(batchMetadataFile) try { Some(deserialize(input)) + } catch { +case ise: IllegalStateException => --- End diff -- the low-level exception does not know about the log file's path, and I'm trying to put it into the error message to give users very explicit information --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17070#discussion_r103102422 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -268,6 +274,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: new FileSystemManager(metadataPath, hadoopConf) } } + + /** + * Parse the log version from the given `text` -- will throw exception when the parsed version + * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1", + * "v123xyz" etc.) + */ + private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = { +if (text.length > 0 && text(0) == 'v') { + val version = +try { text.substring(1, text.length).toInt } +catch { + case _: NumberFormatException => +throw new IllegalStateException(s"Log file was malformed: failed to read correct log " + + s"version from $text.") +} + if (version > 0) { +if (version > maxSupportedVersion) { + throw new IllegalStateException(s"UnsupportedLogVersion: maximum supported log version " + +s"is v${maxSupportedVersion}, but encountered v$version. The log file was produced " + +s"by a newer version of Spark and cannot be read by this version. Please upgrade.") +} +else { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17070#discussion_r103102416 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.streaming import java.io._ +import java.lang.IllegalStateException --- End diff -- ah, what a simple mistake :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17070#discussion_r103102420 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala --- @@ -268,6 +274,37 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: new FileSystemManager(metadataPath, hadoopConf) } } + + /** + * Parse the log version from the given `text` -- will throw exception when the parsed version + * exceeds `maxSupportedVersion`, or when `text` is malformed (such as "xyz", "v", "v-1", + * "v123xyz" etc.) + */ + private[sql] def parseVersion(text: String, maxSupportedVersion: Int): Int = { +if (text.length > 0 && text(0) == 'v') { + val version = +try { text.substring(1, text.length).toInt } --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17070#discussion_r103102394 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala --- @@ -100,7 +100,8 @@ private[kafka010] class KafkaSource( override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { out.write(0) // A zero byte is written to support Spark 2.1.0 (SPARK-19517) val writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) - writer.write(VERSION) + writer.write("v" + VERSION) --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17070#discussion_r103102389 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -226,7 +226,15 @@ class KafkaSourceSuite extends KafkaSourceTest { source.getOffset.get // Read initial offset } - assert(e.getMessage.contains("Please upgrade your Spark")) + Seq( --- End diff -- done; thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17070 [WIP][SS] Good error message for version mismatch in log files ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark better-msg Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17070.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17070 commit dcae69b351ac71bf1554b760aeea6b763b2bf4c0 Author: Liwei Lin <lwl...@gmail.com> Date: 2017-02-25T03:46:35Z Fix --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/16987 Reopening :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
GitHub user lw-lin reopened a pull request: https://github.com/apache/spark/pull/16987 [SPARK-19633][SS] FileSource read from FileSink ## What changes were proposed in this pull request? Right now file source always uses `InMemoryFileIndex` to scan files from a given path. But when reading the outputs from another streaming query, the file source should use `MetadataFileIndex` to list files from the sink log. This patch adds this support. ## `MetadataFileIndex` or `InMemoryFileIndex` ```scala spark .readStream .format(...) .load("/some/path") // for a non-glob path: // - use `MetadataFileIndex` when `/some/path/_spark_meta` exists // - fall back to `InMemoryFileIndex` otherwise ``` ```scala spark .readStream .format(...) .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex` ``` ## How was this patch tested? two newly added tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark source-read-from-sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16987.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16987 commit b66d2ccabcae41973bd8af4ed406567dc071ff67 Author: Liwei Lin <lwl...@gmail.com> Date: 2017-02-18T01:20:18Z File Source reads from File Sink --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin closed the pull request at: https://github.com/apache/spark/pull/16987 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/16987 Using deterministic file names sounds great. Thanks! I'm closing this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/16987 @marmbrus @zsxwing would you take a look at this? thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/16987#discussion_r101917807 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -76,12 +76,13 @@ abstract class FileStreamSourceTest protected def addData(source: FileStreamSource): Unit } - case class AddTextFileData(content: String, src: File, tmp: File) -extends AddFileData { + case class AddTextFileData ( + content: String, src: File, tmp: File, finalFileName: Option[String] = None +) extends AddFileData { override def addData(source: FileStreamSource): Unit = { val tempFile = Utils.tempFileWith(new File(tmp, "text")) - val finalFile = new File(src, tempFile.getName) + val finalFile = new File(src, finalFileName.getOrElse(tempFile.getName)) --- End diff -- this is to keep track of the file name for later checking --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/16987 Jenkins retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16987: [WIP][SPARK-][SS] FileSource read from FileSink
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/16987 [WIP][SPARK-][SS] FileSource read from FileSink ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark source-read-from-sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16987.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16987 commit b66d2ccabcae41973bd8af4ed406567dc071ff67 Author: Liwei Lin <lwl...@gmail.com> Date: 2017-02-18T01:20:18Z File Source reads from File Sink --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16912: [SPARK-19576] [Core] Task attempt paths exist in output ...
Github user lw-lin commented on the issue: https://github.com/apache/spark/pull/16912 To me this PR aims to also use driver to coordinate Hadoop output committing for `saveAsNewAPIHadoopFile` -- actually the same was added for `saveAsHadoopFile` back in https://github.com/apache/spark/pull/4066. Seems like issues has been reported with the current `saveAsNewAPIHadoopFile` -- like in https://github.com/apache/spark/pull/4066 by @matrixlibing. But this issue only exists prior to 2.2.0. So @JoshRosen would you share your thoughts on this? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org