[GitHub] spark issue #19435: [MINOR][SS] "keyWithIndexToNumValues" -> "keyWithIndexTo...

2017-10-11 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/19435
  
@tdas @zsxwing would you take a look, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19435: [WIP][SS][MINOR] "keyWithIndexToNumValues" -> "keyWithIn...

2017-10-05 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/19435
  
@tdas would you take a look, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19435: [WIP][SS][MINOR] "keyWithIndexToNumValues" -> "ke...

2017-10-04 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19435#discussion_r142836474
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -291,7 +291,7 @@ class SymmetricHashJoinStateManager(
   }
 
   /** A wrapper around a [[StateStore]] that stores [(key, index) -> 
value]. */
-  private class KeyWithIndexToValueStore extends 
StateStoreHandler(KeyWithIndexToValuesType) {
+  private class KeyWithIndexToValueStore extends 
StateStoreHandler(KeyWithIndexToValueType) {
--- End diff --

dropped the 's' from '...Values...'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19435: [WIP][SS][MINOR] "keyWithIndexToNumValues" -> "ke...

2017-10-04 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/19435

[WIP][SS][MINOR] "keyWithIndexToNumValues" -> "keyWithIndexToValue"

## What changes were proposed in this pull request?

This PR changes `keyWithIndexToNumValues`  to `keyWithIndexToValue`.

There will be folders named with this `keyWithIndexToNumValues`. So if we 
ever want to fix this, let's fix it now.

## How was this patch tested?

existing unit test cases.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark keyWithIndex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19435.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19435


commit c62ba65fcc952de4b244695f32dc41783a2c1631
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-10-05T02:21:02Z

Fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19206: [SPARK-19206][yarn]Client and ApplicationMaster resolveP...

2017-09-14 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/19206
  
It says `Spark-19206` in your PR title but Spark-19206 is actually about 
`Update outdated parameter descriptions in external-kafka module`; so maybe you 
should reference a different JIRA. That's all what Sean and Marcelo meant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18342: [Spark-21123][Docs][Structured Streaming] Options for fi...

2017-06-19 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/18342
  
This lgtm; @zsxwing please also take a look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-05-03 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
thank you @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-05-03 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
Comments have beed addressed -- @zsxwing it'd be great if you could take 
another look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468906
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 ---
@@ -36,20 +37,27 @@ import org.apache.spark.util.SerializableConfiguration
  * A [[FileIndex]] that generates the list of files to process by 
recursively listing all the
  * files present in `paths`.
  *
- * @param rootPaths the list of root table paths to scan
+ * @param rootPathsSpecified the list of root table paths to scan (some of 
which might be
+ *   filtered out later)
  * @param parameters as set of options to control discovery
  * @param partitionSchema an optional partition schema that will be use to 
provide types for the
  *discovered partitions
  */
 class InMemoryFileIndex(
 sparkSession: SparkSession,
-override val rootPaths: Seq[Path],
+rootPathsSpecified: Seq[Path],
 parameters: Map[String, String],
 partitionSchema: Option[StructType],
 fileStatusCache: FileStatusCache = NoopCache)
   extends PartitioningAwareFileIndex(
 sparkSession, parameters, partitionSchema, fileStatusCache) {
 
+  // Filter out streaming metadata dirs or files such as 
"/.../_spark_metadata" (the metadata dir)
+  // or "/.../_spark_metadata/0" (a file in the metadata dir). 
`rootPathsSpecified` might contain
+  // such streaming metadata dir or files, e.g. when after globbing 
"basePath/*" where "basePath"
+  // is the output of a streaming query.
+  override val rootPaths = 
rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory)
--- End diff --

Yea your are quite correct! They will be filted by 
`InMemoryFileIndex.shouldFilterOut`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468833
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 ---
@@ -145,6 +147,41 @@ class FileStreamSinkSuite extends StreamTest {
 }
   }
 
+  test("partitioned writing and batch reading with 'basePath'") {
+val inputData = MemoryStream[Int]
+val ds = inputData.toDS()
+
+val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 ---
@@ -53,6 +53,29 @@ object FileStreamSink extends Logging {
   case _ => false
 }
   }
+
+  /**
+   * Returns true if the path is the metadata dir or its ancestor is the 
metadata dir.
+   * E.g.:
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
+   *  - ancestorIsMetadataDirectory(/a/b/c) => false
+   */
+  def ancestorIsMetadataDirectory(path: Path): Boolean = {
+require(path.isAbsolute, s"$path is required to be absolute")
--- End diff --

switched to `makeQualified`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [SPARK-19965][SS] DataFrame batch reader may fail...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17346#discussion_r114468821
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
 ---
@@ -53,6 +53,29 @@ object FileStreamSink extends Logging {
   case _ => false
 }
   }
+
+  /**
+   * Returns true if the path is the metadata dir or its ancestor is the 
metadata dir.
+   * E.g.:
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata) => true
+   *  - ancestorIsMetadataDirectory(/.../_spark_metadata/0) => true
+   *  - ancestorIsMetadataDirectory(/a/b/c) => false
+   */
+  def ancestorIsMetadataDirectory(path: Path): Boolean = {
+require(path.isAbsolute, s"$path is required to be absolute")
+var currentPath = path
+var finished = false
+while (!finished) {
--- End diff --

fixed. good point!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17735: [SPARK-20441][SPARK-20432][SS] Within the same st...

2017-05-02 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17735#discussion_r114453379
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -120,6 +141,32 @@ class StreamSuite extends StreamTest {
 assertDF(df)
   }
 
+  test("Within the same streaming query, one StreamingRelation should only 
be transformed to one " +
+"StreamingExecutionRelation") {
+val df = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load()
+var query: StreamExecution = null
+try {
+  query =
+df.union(df)
+  .writeStream
+  .format("memory")
+  .queryName("memory")
+  .start()
+  .asInstanceOf[StreamingQueryWrapper]
+  .streamingQuery
+  val executionRelations =
+query
+  .logicalPlan
--- End diff --

ah, i see.
fixed. thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-05-01 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17735
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-05-01 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
@zsxwing would you take a look at your convenience? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-05-01 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-05-01 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17735
  
@zsxwing @brkyvz would you take a look at this? thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-05-01 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17735
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-05-01 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17735
  
@zsxwing @brkyvz would you take a look at this? thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17812: [WIP][SPARK-][SS] Batch queries with 'Dataset/Dat...

2017-04-30 Thread lw-lin
Github user lw-lin closed the pull request at:

https://github.com/apache/spark/pull/17812


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17812: [WIP][SPARK-][SS] Batch queries with 'Dataset/Dat...

2017-04-30 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17812

[WIP][SPARK-][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` 
does not execute

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark batch-watermark

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17812


commit 872f4f41ffe8ee2503a0879591cf74e90803c53e
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-04-24T03:20:10Z

Fix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-04-29 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-04-28 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-27 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17735
  
@brkyvz please take a another look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17735: [SPARK-20441][SPARK-20432][SS] Within the same streaming...

2017-04-25 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17735
  
@zsxwing @brkyvz would you take a look at this? thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-04-23 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
Rebased to master to resolve conflicts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17735: [WIP][SS] Within the same streaming query, one St...

2017-04-23 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17735

[WIP][SS] Within the same streaming query, one StreamingRelation should 
only be transformed to one StreamingExecutionRelation

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark SPARK-20441

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17735


commit 018b5fd72eeb528fbae9be367db18a61c4e5f129
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-04-23T07:24:51Z

Add tests

commit bf502a75924f5a9586b221b67a10536da8e51e0d
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-04-23T10:03:28Z

Fix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-04-06 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-03-28 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17268: [SPARK-19932][SS] Disallow a case that might caus...

2017-03-22 Thread lw-lin
Github user lw-lin closed the pull request at:

https://github.com/apache/spark/pull/17268


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17268: [SPARK-19932][SS] Disallow a case that might cause OOM f...

2017-03-22 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17268
  
Thanks for the comments! Closing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17346: [SPARK-19965][SS] DataFrame batch reader may fail to inf...

2017-03-19 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17346
  
@zsxwing would you take a look at this? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17346: [WIP] DataFrame batch reader may fail to infer pa...

2017-03-19 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17346

[WIP] DataFrame batch reader may fail to infer partitions when reading 
FileStreamSink's output

WIP of SPARK-19965

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark filter-metadata

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17346.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17346


commit 19d0d485f0190cc8b1df8bec3a9f3b56bca3883e
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-03-19T09:43:33Z

Initial commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17219: [SPARK-19876][SS][WIP] OneTime Trigger Executor

2017-03-19 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17219#discussion_r106800936
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/BatchCommitLog.scala
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.io.{InputStream, OutputStream}
+import java.nio.charset.StandardCharsets._
+
+import scala.io.{Source => IOSource}
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Used to write log files that represent batch commit points in 
structured streaming.
+ * A commit log file will be written immediately after the successful 
completion of a
+ * batch, and before processing the next batch. Here is an execution 
summary:
+ * - trigger batch 1
+ * - obtain batch 1 offsets and write to offset log
+ * - process batch 1
+ * - write batch 1 to completion log
+ * - trigger batch 2
+ * - obtain bactch 2 offsets and write to offset log
+ * - process batch 2
+ * - write batch 2 to completion log
+ * 
+ *
+ * The current format of the batch completion log is:
+ * line 1: version
+ * line 2: metadata (optional json string)
+ */
+class BatchCommitLog(sparkSession: SparkSession, path: String)
+  extends HDFSMetadataLog[String](sparkSession, path) {
+
+  override protected def deserialize(in: InputStream): String = {
+// called inside a try-finally where the underlying stream is closed 
in the caller
+val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
+if (!lines.hasNext) {
+  throw new IllegalStateException("Incomplete log file in the offset 
commit log")
+}
+val version = lines.next().trim.toInt
+if (BatchCommitLog.VERSION < version) {
+  throw new IllegalStateException(s"Incompatible log file version 
${version}")
+}
--- End diff --

nit: now that https://github.com/apache/spark/pull/17070 has been merged, 
maybe let's use `parseVersion(lines.next(), BatchCommitLog.VERSION)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17327: [SPARK-19721][SS][BRANCH-2.1] Good error message for ver...

2017-03-17 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17327
  
Thanks! Closed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17327: [SPARK-19721][SS][BRANCH-2.1] Good error message ...

2017-03-17 Thread lw-lin
Github user lw-lin closed the pull request at:

https://github.com/apache/spark/pull/17327


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...

2017-03-16 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17070
  
@zsxwing sure, please see https://github.com/apache/spark/pull/17327


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17327: [SPARK-19721][SS][BRANCH-2.1] Good error message ...

2017-03-16 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17327

[SPARK-19721][SS][BRANCH-2.1] Good error message for version mismatch in 
log files

## Problem

There are several places where we write out version identifiers in various 
logs for structured streaming (usually `v1`). However, in the places where we 
check for this, we throw a confusing error message.

## What changes were proposed in this pull request?

This patch made two major changes:
1. added a `parseVersion(...)` method, and based on this method, fixed the 
following places the way they did version checking (no other place needed to do 
this checking):
```
HDFSMetadataLog
  - CompactibleFileStreamLog  > fixed with this patch
- FileStreamSourceLog  ---> inherited the fix of 
`CompactibleFileStreamLog`
- FileStreamSinkLog  -> inherited the fix of 
`CompactibleFileStreamLog`
  - OffsetSeqLog  > fixed with this patch
  - anonymous subclass in KafkaSource  ---> fixed with this patch
```

2. changed the type of `FileStreamSinkLog.VERSION`, 
`FileStreamSourceLog.VERSION` etc. from `String` to `Int`, so that we can 
identify newer versions via `version > 1` instead of `version != "v1"`
- note this didn't break any backwards compatibility -- we are still 
writing out `"v1"` and reading back `"v1"`

## Exception message with this patch
```
java.lang.IllegalStateException: Failed to read log file 
/private/var/folders/nn/82rmvkk568sd8p3p8tb33trwgn/T/spark-86867b65-0069-4ef1-b0eb-d8bd258ff5b8/0.
 UnsupportedLogVersion: maximum supported log version is v1, but encountered 
v99. The log file was produced by a newer version of Spark and cannot be read 
by this version. Please upgrade.
at 
org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:202)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:78)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3$$anonfun$apply$mcV$sp$2.apply(OffsetSeqLogSuite.scala:75)
at 
org.apache.spark.sql.test.SQLTestUtils$class.withTempDir(SQLTestUtils.scala:133)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite.withTempDir(OffsetSeqLogSuite.scala:26)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply$mcV$sp(OffsetSeqLogSuite.scala:75)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
at 
org.apache.spark.sql.execution.streaming.OffsetSeqLogSuite$$anonfun$3.apply(OffsetSeqLogSuite.scala:75)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
```


## How was this patch tested?

unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark good-msg-2.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17327.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17327


commit daabb27aa32cb19c157e19081f6d08ff368bb42b
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-02-25T03:46:35Z

Fix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17268: [SPARK-19932][SS] Disallow a case that might case OOM fo...

2017-03-16 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17268
  
Sure; sorry I didn't say it out but I meant the same thing :-)

@marmbrus now that I've updated this as well as the JIRA, would you mind 
taking another look? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [SPARK-19721][SS] Good error message for version ...

2017-03-16 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17070#discussion_r106352628
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -195,6 +195,11 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   val input = fileManager.open(batchMetadataFile)
   try {
 Some(deserialize(input))
+  } catch {
+case ise: IllegalStateException =>
+  // re-throw the exception with the log file path added
+  throw new IllegalStateException(
+s"Failed to read log file $batchMetadataFile. 
${ise.getMessage}")
--- End diff --

done; thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17268: [SPARK-19932][SS] Also save event time into StateStore f...

2017-03-14 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17268
  
Thank you @marmbrus for the detailed explanation!

> For that reason, I think its safest to require the user to explicitly 
include the timestamp.

Yea, let me update this in this direction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17299: [SPARK-19817][SS] Make it clear that `timeZone` is a gen...

2017-03-14 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17299
  
This is the fix for the streaming counter-part (i.e. Structured Streaming), 
@ueshin @gatorsmile would you take a look? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17299: [SPARK-19817][SS] Make it clear that `timeZone` i...

2017-03-14 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17299

[SPARK-19817][SS] Make it clear that `timeZone` is a general option in 
DataStreamReader/Writer

## What changes were proposed in this pull request?

As timezone setting can also affect partition values, it works for all 
formats, we should make it clear.

## How was this patch tested?

N/A


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark timezone

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17299.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17299


commit 0b13c0850472b5172a9f428f923b99a168a79e00
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-03-15T02:06:06Z

Initial commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17268: [SPARK-19932][SS] Also save event time into StateStore f...

2017-03-13 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17268
  
@marmbrus thanks for the comments.

> In the worst case... it is possible that the result actually ends up with 
duplicates in it.

Ah, could you elaborate?  I'm not sure why there might be duplicates, since 
this patch proposes to include the timestamp column implicitly in the state 
store values rather than in the state store keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17268: [SPARK-19932][SS] Also save event time into StateStore f...

2017-03-13 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17268
  
@marmbrus @zsxwing would you take a look at this, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17268: [WIP][SPARK][SS] Also save event time into StateS...

2017-03-12 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17268

[WIP][SPARK][SS] Also save event time into StateStore for certain cases

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark dedup-watermark

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17268


commit 1cdf29af06418fe6dea6ed9d4b85be62c8b6edcd
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-03-12T12:28:54Z

Initial commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...

2017-03-09 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17070
  
@zsxwing would you take a look when you've got a minute? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...

2017-03-09 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17120
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...

2017-03-08 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17120#discussion_r105097597
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -79,9 +81,16 @@ class FileStreamSource(
 sourceOptions.maxFileAgeMs
   }
 
+  private val fileNameOnly = sourceOptions.fileNameOnly
+  if (fileNameOnly) {
+logWarning("'fileNameOnly' is enabled. Make sure your file names are 
unique (e.g. using " +
+  "UUID), otherwise, files using the same name will be considered as 
the same file and causes" +
+  " data lost")
--- End diff --

fixed -- thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17216: [SPARK-19873][SS] Record num shuffle partitions i...

2017-03-08 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17216#discussion_r105081023
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeq.scala
 ---
@@ -71,7 +71,10 @@ object OffsetSeq {
  * @param batchTimestampMs: The current batch processing timestamp.
  * Time unit: milliseconds
  */
-case class OffsetSeqMetadata(var batchWatermarkMs: Long = 0, var 
batchTimestampMs: Long = 0) {
+case class OffsetSeqMetadata(
+var batchWatermarkMs: Long = 0,
+var batchTimestampMs: Long = 0,
+var numShufflePartitions: Int = 0) {
--- End diff --

Hi @kunalkhamar, in case you would update `OffsetSeq`'s log version number, 
the work being done in #17070 might be helpful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...

2017-03-08 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17120#discussion_r105080626
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -309,6 +315,10 @@ object FileStreamSource {
 
 def size: Int = map.size()
 
+/**
+ * Note when `fileNameOnly` is true, each entry would be (file name, 
timestamp) rather than
+ * (full path, timestamp).
+ */
 def allEntries: Seq[(String, Timestamp)] = {
--- End diff --

deleted :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...

2017-03-08 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17120#discussion_r105080572
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -75,7 +77,7 @@ class FileStreamSource(
 
   /** A mapping from a file that we have processed to some timestamp it 
was last modified. */
   // Visible for testing and debugging in production.
-  val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
+  val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs, 
sourceOptions.fileNameOnly)
--- End diff --

added. thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...

2017-03-06 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17120
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...

2017-03-06 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17070
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...

2017-03-04 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17120#discussion_r104287612
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -1253,8 +1253,26 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 assert(map.isNewFile("e", 20))
   }
 
+  test("SeenFilesMap with fileNameOnly = true") {
+val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true)
+
+map.add("file:///a/b/c/d", 5)
+map.add("file:///a/b/c/e", 5)
+assert(map.size == 2)
--- End diff --

ah, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...

2017-03-03 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17120
  
Thank you @marmbrus @steveloughran for the feedback. Added some explicit 
docs. Here's a screenshot of the affected section from the programming guide:


![snip20170304_5](https://cloud.githubusercontent.com/assets/15843379/23575433/06b18bf2-00c7-11e7-9a44-9205fab5d6bc.png)

Please take a look again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...

2017-03-03 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17120#discussion_r104276335
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark.
 Append
 
 path: path to the output directory, must be specified.
+
 maxFilesPerTrigger: maximum number of new files to be 
considered in every trigger (default: no max)
 
-latestFirst: whether to processs the latest new files 
first, useful when there is a large backlog of files(default: false)
-
+latestFirst: whether to processs the latest new files 
first, useful when there is a large backlog of files (default: false)
+
+fileNameOnly: whether to check new files based on 
only the filename instead of on the full path (default: false). With this set 
to `true`, the following files would be considered as the same file, because 
their filenames, "dataset.txt", are the same:
+
+· "file:///dataset.txt"
+· "s3://a/dataset.txt"
+· "s3n://a/b/dataset.txt"
+· "s3a://a/b/c/dataset.txt"
--- End diff --

the incidents of a `` does not look pretty, so I'm using a dot here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...

2017-03-03 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17120
  
Thank you @marmbrus @steveloughran for the feedback. I've added some 
explicit docs. Here's a screenshot of the affected section from the programming 
guide:


![snip20170304_4](https://cloud.githubusercontent.com/assets/15843379/23575388/18ce95ba-00c6-11e7-8624-3fcb74a31d8a.png)

Please take a look again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...

2017-03-03 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17120#discussion_r104276118
  
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark.
 Append
 
 path: path to the output directory, must be specified.
+
 maxFilesPerTrigger: maximum number of new files to be 
considered in every trigger (default: no max)
 
 latestFirst: whether to processs the latest new files 
first, useful when there is a large backlog of files(default: false)
-
+
+fileNameOnly: whether to check new files based on 
only the filename instead of on the full path. With this set to `true`, the 
following files would be considered as the same file, because their filenames, 
"dataset.txt", are the same:
+
+· "file:///dataset.txt"
+· "s3://a/dataset.txt"
+· "s3n://a/b/dataset.txt"
+· "s3a://a/b/c/dataset.txt"
--- End diff --

the incidents of a `` does not look pretty, so I'm using a dot here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16970: [SPARK-19497][SS]Implement streaming deduplication

2017-03-02 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/16970
  
@uncleGen I think `requiredChildDistribution = 
ClusteredDistribution(keyExpressions) :: Nil` (please see 
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala#L344-L345))
 takes care of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17070: [SPARK-19721][SS] Good error message for version mismatc...

2017-03-02 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17070
  
@zsxwing would you take a look when you've got a minute? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17120: [SPARK-19715][Structured Streaming] Option to Strip Path...

2017-03-01 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17120
  
@steveloughran thanks for the comments.

@marmbrus @zsxwing it'd be great if you could share some thoughts!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...

2017-03-01 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17120

[SPARK-19715][Structured Streaming] Option to Strip Paths in FileSource

## What changes were proposed in this pull request?

Today, we compare the whole path when deciding if a file is new in the 
FileSource for structured streaming. However, this cause cause false negatives 
in the case where the path has changed in a cosmetic way (i.e. changing `s3n` 
to `s3a`).

This patch adds an option `fileNameOnly` that causes the new file check to 
be based only on the filename (but still store the whole path in the log).

## Usage

```scala
spark
  .readStream
  .option("fileNameOnly", true)
  .text("s3n://bucket/dir1/dir2")
  .writeStream
  ...
```
## How was this patch tested?

Added a test case


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark filename-only

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17120.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17120


commit aeb10d100a24ca644745fb8b26985b584fd5118e
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-02-28T15:29:17Z

Add support for `fileNameOnly`




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103604050
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -190,32 +190,31 @@ class FileStreamSource(
 val startTime = System.nanoTime
 
 var allFiles: Seq[FileStatus] = null
-if (sourceHasMetadata.isEmpty) {
-  if (FileStreamSink.hasMetadata(Seq(path), hadoopConf)) {
-sourceHasMetadata = Some(true)
-allFiles = allFilesUsingMetadataLogFileIndex()
-  } else {
-allFiles = allFilesUsingInMemoryFileIndex()
-if (allFiles.isEmpty) {
-  // we still cannot decide
+sourceHasMetadata match {
--- End diff --

simply switched to `sourceHasMetadata match { case... case ... case ...}`
actual diff is quite small


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603945
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partit

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603935
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partit

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603942
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partit

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603922
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
+withTempDirs { case (dir, tmp) =>
+  // q1 is a streaming query that reads from memory and writes to 
partitioned json files
+  val q1_source = MemoryStream[(String, String)]
+  val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+  val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+  val q1 =
+q1_source
+  .toDF()
+  .select($"_1" as "partition", $"_2" as "value")
+  .writeStream
+  .option("checkpointLocation", q1_checkpointDir)
+  .partitionBy("partition")
+  .format("json")
+  .start(q1_outputDir)
+
+  // q2 is a streaming query that reads q1's partitioned json outputs
+  val schema = new StructType().add("value", 
StringType).add("partition", StringType)
+  val q2 = createFileStream("json", q1_outputDir, 
Some(schema)).filter($"value" contains "keep")
+
+  def q1AddData(data: (String, String)*): StreamAction =
+Execute { _ =>
+  q1_source.addData(data)
+  q1.processAllAvailable()
+}
+  def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+  testStream(q2)(
+// batch 0: append to a new partition=foo, should read value and 
partition
+q1AddData(("foo", "drop1"), ("foo", "keep2")),
+q2ProcessAllAvailable(),
+CheckAnswer(("keep2", "foo")),
+
+// batch 1: append to same partit

[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603898
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
+  Execute { _ => q1.stop() }
+)
+  }
+}
+  }
+
+  test("read partitioned data from outputs of another streaming query") {
--- End diff --

test removed -- Let me think about this write partition infommation thing :)
thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603705
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
+val q1_checkpointDir = new File(dir, 
"q1_checkpointDir").getCanonicalPath
+val q1_outputDir = new File(dir, "q1_outputDir").getCanonicalPath
+val q1 =
+  q1_source
+.toDF()
+.writeStream
+.option("checkpointLocation", q1_checkpointDir)
+.format("text")
+.start(q1_outputDir)
+
+// q2 is a streaming query that reads q1's text outputs
+val q2 = createFileStream("text", q1_outputDir).filter($"value" 
contains "keep")
+
+def q1AddData(data: String*): StreamAction =
+  Execute { _ =>
+q1_source.addData(data)
+q1.processAllAvailable()
+  }
+def q2ProcessAllAvailable(): StreamAction = Execute { q2 => 
q2.processAllAvailable() }
+
+testStream(q2)(
+  // batch 0
+  q1AddData("drop1", "keep2"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2"),
+
+  // batch 1
+  Assert {
+// create a text file that won't be on q1's sink log
+// thus even if its contents contains "keep", it should NOT 
appear in q2's answer
+val shouldNotKeep = new File(q1_outputDir, 
"should_not_keep.txt")
+stringToFile(shouldNotKeep, "should_not_keep!!!")
+shouldNotKeep.exists()
+  },
+  q1AddData("keep3"),
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3"),
+
+  // batch 2: check that things work well when the sink log gets 
compacted
+  q1AddData("keep4"),
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(q1_outputDir, 
s"${FileStreamSink.metadataDir}/2.compact").exists()
+  },
+  q2ProcessAllAvailable(),
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  // stop q1 manually
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603671
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -208,6 +208,11 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with Timeouts {
 }
   }
 
+  /** Execute arbitrary code */
+  case class Execute(val func: StreamExecution => Any) extends 
StreamAction {
--- End diff --

fixed, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603693
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+// q1 is a streaming query that reads from memory and writes to 
text files
+val q1_source = MemoryStream[String]
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103603687
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +665,154 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  test("read data from outputs of another streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103405331
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -52,10 +52,7 @@ abstract class FileStreamSourceTest
 query.nonEmpty,
 "Cannot add data when there is no query for finding the active 
file stream source")
 
-  val sources = query.get.logicalPlan.collect {
-case StreamingExecutionRelation(source, _) if 
source.isInstanceOf[FileStreamSource] =>
-  source.asInstanceOf[FileStreamSource]
-  }
--- End diff --

this common logic is extracted out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103404962
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -159,28 +161,64 @@ class FileStreamSource(
 
   /**
* If the source has a metadata log indicating which files should be 
read, then we should use it.
-   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   * Only when user gives a non-glob path that will we figure out whether 
the source has some
+   * metadata log
+   *
+   * Nonemeans we don't know at the moment
+   * Some(true)  means we know for sure the source DOES have metadata
+   * Some(false) means we know for sure the source DOSE NOT have metadata
*/
-  private val sourceHasMetadata: Boolean =
-!SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
-  FileStreamSink.hasMetadata(Seq(path), 
sparkSession.sessionState.newHadoopConf())
+  @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
+if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else 
None
+
+  private def allFilesUsingInMemoryFileIndex() = {
+val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, 
options, Some(new StructType))
+fileIndex.allFiles()
+  }
+
+  private def allFilesUsingMetadataLogFileIndex() = {
+// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
+// non-glob path
+new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
+  }
 
   /**
* Returns a list of files found, sorted by their timestamp.
*/
   private def fetchAllFiles(): Seq[(String, Long)] = {
 val startTime = System.nanoTime
-val catalog =
-  if (sourceHasMetadata) {
-// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
-// non-glob path
-new MetadataLogFileIndex(sparkSession, qualifiedBasePath)
+
--- End diff --

seems like `sourceHasMetadata match { case ... }` is more appropriate here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103404486
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -159,28 +161,64 @@ class FileStreamSource(
 
   /**
* If the source has a metadata log indicating which files should be 
read, then we should use it.
-   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   * Only when user gives a non-glob path that will we figure out whether 
the source has some
+   * metadata log
+   *
+   * Nonemeans we don't know at the moment
+   * Some(true)  means we know for sure the source DOES have metadata
+   * Some(false) means we know for sure the source DOSE NOT have metadata
*/
-  private val sourceHasMetadata: Boolean =
-!SparkHadoopUtil.get.isGlobPath(new Path(path)) &&
-  FileStreamSink.hasMetadata(Seq(path), 
sparkSession.sessionState.newHadoopConf())
+  @volatile private[sql] var sourceHasMetadata: Option[Boolean] =
+if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else 
None
+
+  private def allFilesUsingInMemoryFileIndex() = {
+val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
+val fileIndex = new InMemoryFileIndex(sparkSession, globbedPaths, 
options, Some(new StructType))
+fileIndex.allFiles()
+  }
+
+  private def allFilesUsingMetadataLogFileIndex() = {
+// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
+// non-glob path
+new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
+  }
 
   /**
* Returns a list of files found, sorted by their timestamp.
*/
   private def fetchAllFiles(): Seq[(String, Long)] = {
 val startTime = System.nanoTime
-val catalog =
-  if (sourceHasMetadata) {
-// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is 
guaranteed to be a
-// non-glob path
-new MetadataLogFileIndex(sparkSession, qualifiedBasePath)
+
--- End diff --

then based on `sourceHasMetadata`'s value, we can choose which `FileIndex` 
should be used. As showed below, `case None` should be dealt with most care.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-28 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103403986
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -159,28 +161,64 @@ class FileStreamSource(
 
   /**
* If the source has a metadata log indicating which files should be 
read, then we should use it.
-   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   * Only when user gives a non-glob path that will we figure out whether 
the source has some
+   * metadata log
+   *
+   * Nonemeans we don't know at the moment
+   * Some(true)  means we know for sure the source DOES have metadata
+   * Some(false) means we know for sure the source DOSE NOT have metadata
--- End diff --

( some notes here since the changes are not trival )

here we're using this `sourceHasMetadata` to indicate whether we know for 
sure the source has metadata, as stated in the source file comments:
- `None`means we don't know at the moment
- `Some(true)`  means we know for sure the source DOES have metadata
- `Some(false)` means we know for sure the source DOSE NOT have metadata



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-27 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103366158
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -158,12 +158,28 @@ class FileStreamSource(
   }
 
   /**
+   * If the source has a metadata log indicating which files should be 
read, then we should use it.
+   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   */
+  private val sourceHasMetadata: Boolean =
--- End diff --

and add a dedicated test case of course


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-27 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103366082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
@@ -158,12 +158,28 @@ class FileStreamSource(
   }
 
   /**
+   * If the source has a metadata log indicating which files should be 
read, then we should use it.
+   * We figure out whether there exists some metadata log only when user 
gives a non-glob path.
+   */
+  private val sourceHasMetadata: Boolean =
--- End diff --

ah thanks! I was about to change it to a method which would stop detecting 
once we know for sure to use a metadatafileindex or a inmemoryfileindex and 
save up this information. will udpate with this code soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-26 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/16987
  
Rebased to master and tests updated. @zsxwing would you take another look 
when you've got a minute?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103104036
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +663,101 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  testWithUninterruptibleThread("read data from outputs of another 
streaming query") {
+withSQLConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3") {
+  withTempDirs { case (dir, tmp) =>
+val sinkLogDir = new File(dir, 
FileStreamSink.metadataDir).getCanonicalPath
+val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, 
spark, sinkLogDir)
+
+val fileStream = createFileStream("text", dir.getCanonicalPath)
+val filtered = fileStream.filter($"value" contains "keep")
+
+def addIntoSinkLog(batch: Int, fileName: String): Boolean = {
+  val unqualifiedDirPath = new Path(new File(dir, 
fileName).getCanonicalPath)
+  val fs = 
unqualifiedDirPath.getFileSystem(sparkContext.hadoopConfiguration)
+  val sinkFileStatus = 
SinkFileStatus(fs.getFileStatus(unqualifiedDirPath))
+  sinkLog.add(batch, Array(sinkFileStatus))
+}
+
+testStream(filtered)(
+  // Create new dir and write to it, should read
+  AddTextFileData("drop1\nkeep2", dir, tmp, Some("file_1")),
+  Assert { addIntoSinkLog(0, "file_1") },
+  CheckAnswer("keep2"),
+
+  // Create "file_2" but DO NOT add it to the log intentionally
+  AddTextFileData("should_not_keep!!!", dir, tmp, Some("file_2")),
+  Assert { new File(dir, "file_2").exists() },
+  AddTextFileData("keep3", dir, tmp, Some("file_3")),
+  Assert { addIntoSinkLog(1, "file_3") },
+  // Should NOT read "file_2"; should read "file_3"
+  CheckAnswer("keep2", "keep3"),
+
+  // Check that things work well when the sink log gets compacted
+  AddTextFileData("keep4", dir, tmp, Some("file_4")),
+  Assert { addIntoSinkLog(2, "file_4") },
+  Assert {
+// compact interval is 3, so file "2.compact" should exist
+new File(sinkLogDir, "2" + 
CompactibleFileStreamLog.COMPACT_FILE_SUFFIX).exists()
+  },
+  CheckAnswer("keep2", "keep3", "keep4"),
+
+  AddTextFileData("keep5", dir, tmp, Some("file_5")),
+  Assert { addIntoSinkLog(3, "file_5") },
+  CheckAnswer("keep2", "keep3", "keep4", "keep5")
+)
+  }
+}
+  }
+
+  testWithUninterruptibleThread("read partitioned data from outputs of 
another streaming query") {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103104024
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -662,6 +663,101 @@ class FileStreamSourceSuite extends 
FileStreamSourceTest {
 }
   }
 
+  testWithUninterruptibleThread("read data from outputs of another 
streaming query") {
--- End diff --

done; thanks! and good job for https://github.com/apache/spark/pull/16947!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r103104003
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 ---
@@ -243,13 +243,20 @@ case class DataSource(
 val path = caseInsensitiveOptions.getOrElse("path", {
   throw new IllegalArgumentException("'path' is not specified")
 })
+// If we're reading files from outputs of another streaming query, 
then it does not make
+// sense to glob files since we would get files from the metadata 
log.
+// Thus we would figure out whether there exists some metadata log 
only when user gives a
+// non-glob path.
+val sourceHasMetadata =
+  !SparkHadoopUtil.get.isGlobPath(new Path(path)) && 
hasMetadata(Seq(path))
--- End diff --

Yea `hasMetadata` was the reason! Now it lives in `object FileStreamSink` 
:-D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17070: [WIP][SS] Good error message for version mismatch in log...

2017-02-26 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/17070
  
@srowen thank you for the comments! I was trying to tackle SPARK-19721, 
sorry the summary just said "WIP" without a JIRA number; adding JIRA number 
back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17070#discussion_r103102481
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -195,6 +196,11 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
   val input = fileManager.open(batchMetadataFile)
   try {
 Some(deserialize(input))
+  } catch {
+case ise: IllegalStateException =>
--- End diff --

the low-level exception does not know about the log file's path, and I'm 
trying to put it into the error message to give users very explicit information


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17070#discussion_r103102422
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -268,6 +274,37 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 new FileSystemManager(metadataPath, hadoopConf)
 }
   }
+
+  /**
+   * Parse the log version from the given `text` -- will throw exception 
when the parsed version
+   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as 
"xyz", "v", "v-1",
+   * "v123xyz" etc.)
+   */
+  private[sql] def parseVersion(text: String, maxSupportedVersion: Int): 
Int = {
+if (text.length > 0 && text(0) == 'v') {
+  val version =
+try { text.substring(1, text.length).toInt }
+catch {
+  case _: NumberFormatException =>
+throw new IllegalStateException(s"Log file was malformed: 
failed to read correct log " +
+  s"version from $text.")
+}
+  if (version > 0) {
+if (version > maxSupportedVersion) {
+  throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
+s"is v${maxSupportedVersion}, but encountered v$version. The 
log file was produced " +
+s"by a newer version of Spark and cannot be read by this 
version. Please upgrade.")
+}
+else {
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17070#discussion_r103102416
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import java.io._
+import java.lang.IllegalStateException
--- End diff --

ah, what a simple mistake :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17070#discussion_r103102420
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 ---
@@ -268,6 +274,37 @@ class HDFSMetadataLog[T <: AnyRef : 
ClassTag](sparkSession: SparkSession, path:
 new FileSystemManager(metadataPath, hadoopConf)
 }
   }
+
+  /**
+   * Parse the log version from the given `text` -- will throw exception 
when the parsed version
+   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as 
"xyz", "v", "v-1",
+   * "v123xyz" etc.)
+   */
+  private[sql] def parseVersion(text: String, maxSupportedVersion: Int): 
Int = {
+if (text.length > 0 && text(0) == 'v') {
+  val version =
+try { text.substring(1, text.length).toInt }
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17070#discussion_r103102394
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -100,7 +100,8 @@ private[kafka010] class KafkaSource(
 override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
   out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
   val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
-  writer.write(VERSION)
+  writer.write("v" + VERSION)
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...

2017-02-26 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17070#discussion_r103102389
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -226,7 +226,15 @@ class KafkaSourceSuite extends KafkaSourceTest {
 source.getOffset.get // Read initial offset
   }
 
-  assert(e.getMessage.contains("Please upgrade your Spark"))
+  Seq(
--- End diff --

done; thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17070: [WIP][SS] Good error message for version mismatch...

2017-02-25 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/17070

[WIP][SS] Good error message for version mismatch in log files

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark better-msg

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17070.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17070


commit dcae69b351ac71bf1554b760aeea6b763b2bf4c0
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-02-25T03:46:35Z

Fix




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-23 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/16987
  
Reopening :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-23 Thread lw-lin
GitHub user lw-lin reopened a pull request:

https://github.com/apache/spark/pull/16987

[SPARK-19633][SS] FileSource read from FileSink

## What changes were proposed in this pull request?

Right now file source always uses `InMemoryFileIndex` to scan files from a 
given path.

But when reading the outputs from another streaming query, the file source 
should use `MetadataFileIndex` to list files from the sink log. This patch adds 
this support.

## `MetadataFileIndex` or `InMemoryFileIndex`
```scala
spark
  .readStream
  .format(...)
  .load("/some/path") // for a non-glob path:
  //   - use `MetadataFileIndex` when 
`/some/path/_spark_meta` exists
  //   - fall back to `InMemoryFileIndex` otherwise
```
```scala
spark
  .readStream
  .format(...)
  .load("/some/path/*/*") // for a glob path: always use `InMemoryFileIndex`
```

## How was this patch tested?

two newly added tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark source-read-from-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16987


commit b66d2ccabcae41973bd8af4ed406567dc071ff67
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-02-18T01:20:18Z

File Source reads from File Sink




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-22 Thread lw-lin
Github user lw-lin closed the pull request at:

https://github.com/apache/spark/pull/16987


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-22 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/16987
  
Using deterministic file names sounds great. Thanks! I'm closing this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-20 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/16987
  
@marmbrus @zsxwing would you take a look at this? thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [SPARK-19633][SS] FileSource read from FileSink

2017-02-19 Thread lw-lin
Github user lw-lin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16987#discussion_r101917807
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 ---
@@ -76,12 +76,13 @@ abstract class FileStreamSourceTest
 protected def addData(source: FileStreamSource): Unit
   }
 
-  case class AddTextFileData(content: String, src: File, tmp: File)
-extends AddFileData {
+  case class AddTextFileData (
+  content: String, src: File, tmp: File, finalFileName: Option[String] 
= None
+) extends AddFileData {
 
 override def addData(source: FileStreamSource): Unit = {
   val tempFile = Utils.tempFileWith(new File(tmp, "text"))
-  val finalFile = new File(src, tempFile.getName)
+  val finalFile = new File(src, 
finalFileName.getOrElse(tempFile.getName))
--- End diff --

this is to keep track of the file name for later checking


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16987: [WIP][SPARK-][SS] FileSource read from FileSink

2017-02-19 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/16987
  
Jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16987: [WIP][SPARK-][SS] FileSource read from FileSink

2017-02-18 Thread lw-lin
GitHub user lw-lin opened a pull request:

https://github.com/apache/spark/pull/16987

[WIP][SPARK-][SS] FileSource read from FileSink

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lw-lin/spark source-read-from-sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16987.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16987


commit b66d2ccabcae41973bd8af4ed406567dc071ff67
Author: Liwei Lin <lwl...@gmail.com>
Date:   2017-02-18T01:20:18Z

File Source reads from File Sink




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16912: [SPARK-19576] [Core] Task attempt paths exist in output ...

2017-02-13 Thread lw-lin
Github user lw-lin commented on the issue:

https://github.com/apache/spark/pull/16912
  
To me this PR aims to also use driver to coordinate Hadoop output 
committing for `saveAsNewAPIHadoopFile` -- actually the same was added for 
`saveAsHadoopFile` back in https://github.com/apache/spark/pull/4066.

Seems like issues has been reported with the current 
`saveAsNewAPIHadoopFile` -- like in https://github.com/apache/spark/pull/4066 
by @matrixlibing. But this issue only exists prior to 2.2.0.

So @JoshRosen would you share your thoughts on this? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   >