[jira] [Issue Comment Deleted] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.

2018-11-07 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23050:

Comment: was deleted

(was: [~ste...@apache.org], I can start working on it.)

> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -
>
> Key: SPARK-23050
> URL: https://issues.apache.org/jira/browse/SPARK-23050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yash Sharma
>Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
> .format("parquet") \
> .option("compression", "snappy") \
> .option("path", "s3://path/data/") \
> .option("checkpointLocation", "s3://path/checkpoint/") \
> .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00  17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10  17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:>(277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
>   at 
> 

[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.

2018-08-13 Thread bharath kumar avusherla (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579242#comment-16579242
 ] 

bharath kumar avusherla commented on SPARK-23050:
-

[~ste...@apache.org], I can start working on it.

> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -
>
> Key: SPARK-23050
> URL: https://issues.apache.org/jira/browse/SPARK-23050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yash Sharma
>Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
> .format("parquet") \
> .option("compression", "snappy") \
> .option("path", "s3://path/data/") \
> .option("checkpointLocation", "s3://path/checkpoint/") \
> .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00  17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10  17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:>(277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitTask(ManifestFileCommitProtocol.scala:109)
>   at 
> 

[jira] [Closed] (SPARK-25052) Is there any possibility that spark structured streaming generate duplicates in the output?

2018-08-09 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla closed SPARK-25052.
---

> Is there any possibility that spark structured streaming generate duplicates 
> in the output?
> ---
>
> Key: SPARK-25052
> URL: https://issues.apache.org/jira/browse/SPARK-25052
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Minor
>
> We recently observed that the spark structured streaming generated duplicates 
> in the output when reading from Kafka topic and storing the output to the S3 
> (and checkpointing in S3).  We ran into this issue twice. This is not 
> reproducible. Is there anyone has ever faced this kind of issue before? Is 
> this because of S3 eventual consistency?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.

2018-08-08 Thread bharath kumar avusherla (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573700#comment-16573700
 ] 

bharath kumar avusherla edited comment on SPARK-23050 at 8/8/18 6:51 PM:
-

Is there any way we can avoid happening this?

We also recently observed the same issue when reading from Kafka topic and 
storing the output to the S3 (and checkpointing in S3) using spark structured 
streaming 2.3.0.


was (Author: abharath9):
Is there any way we can avoid happening this?

We also recently observed the same issue when reading from Kafka topic and 
storing the output to the S3 (and checkpointing in S3). And we are spark 
structured streaming 2.3.0.

> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -
>
> Key: SPARK-23050
> URL: https://issues.apache.org/jira/browse/SPARK-23050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yash Sharma
>Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
> .format("parquet") \
> .option("compression", "snappy") \
> .option("path", "s3://path/data/") \
> .option("checkpointLocation", "s3://path/checkpoint/") \
> .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00  17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10  17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:>(277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> 

[jira] [Commented] (SPARK-23050) Structured Streaming with S3 file source duplicates data because of eventual consistency.

2018-08-08 Thread bharath kumar avusherla (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16573700#comment-16573700
 ] 

bharath kumar avusherla commented on SPARK-23050:
-

Is there any way we can avoid happening this?

We also recently observed the same issue when reading from Kafka topic and 
storing the output to the S3 (and checkpointing in S3). And we are spark 
structured streaming 2.3.0.

> Structured Streaming with S3 file source duplicates data because of eventual 
> consistency.
> -
>
> Key: SPARK-23050
> URL: https://issues.apache.org/jira/browse/SPARK-23050
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Yash Sharma
>Priority: Major
>
> Spark Structured streaming with S3 file source duplicates data because of 
> eventual consistency.
> Re producing the scenario -
> - Structured streaming reading from S3 source. Writing back to S3.
> - Spark tries to commitTask on completion of a task, by verifying if all the 
> files have been written to Filesystem. 
> {{ManifestFileCommitProtocol.commitTask}}.
> - [Eventual consistency issue] Spark finds that the file is not present and 
> fails the task. {{org.apache.spark.SparkException: Task failed while writing 
> rows. No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'}}
> - By this time S3 eventually gets the file.
> - Spark reruns the task and completes the task, but gets a new file name this 
> time. {{ManifestFileCommitProtocol.newTaskTempFile. 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet.}}
> - Data duplicates in results and the same data is processed twice and written 
> to S3.
> - There is no data duplication if spark is able to list presence of all 
> committed files and all tasks succeed.
> Code:
> {code}
> query = selected_df.writeStream \
> .format("parquet") \
> .option("compression", "snappy") \
> .option("path", "s3://path/data/") \
> .option("checkpointLocation", "s3://path/checkpoint/") \
> .start()
> {code}
> Same sized duplicate S3 Files:
> {code}
> $ aws s3 ls s3://path/data/ | grep part-00256
> 2018-01-11 03:37:00  17070 
> part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet
> 2018-01-11 03:37:10  17070 
> part-00256-b62fa7a4-b7e0-43d6-8c38-9705076a7ee1-c000.snappy.parquet
> {code}
> Exception on S3 listing and task failure:
> {code}
> [Stage 5:>(277 + 100) / 
> 597]18/01/11 03:36:59 WARN TaskSetManager: Lost task 256.0 in stage 5.0 (TID  
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
>   at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>  Caused by: java.io.FileNotFoundException: No such file or directory 
> 's3://path/data/part-00256-65ae782d-e32e-48fb-8652-e1d0defc370b-c000.snappy.parquet'
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:816)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:509)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol$$anonfun$4.apply(ManifestFileCommitProtocol.scala:109)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at 

[jira] [Commented] (SPARK-25052) Is there any possibility that spark structured streaming generate duplicates in the output?

2018-08-07 Thread bharath kumar avusherla (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16572608#comment-16572608
 ] 

bharath kumar avusherla commented on SPARK-25052:
-

i also thought about it. Hence I created it as question. Anyhow i will send the 
question to the mailing list.

> Is there any possibility that spark structured streaming generate duplicates 
> in the output?
> ---
>
> Key: SPARK-25052
> URL: https://issues.apache.org/jira/browse/SPARK-25052
> Project: Spark
>  Issue Type: Question
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Minor
>
> We recently observed that the spark structured streaming generated duplicates 
> in the output when reading from Kafka topic and storing the output to the S3 
> (and checkpointing in S3).  We ran into this issue twice. This is not 
> reproducible. Is there anyone has ever faced this kind of issue before? Is 
> this because of S3 eventual consistency?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25052) Is there any possibility that spark structured streaming generate duplicates in the output?

2018-08-07 Thread bharath kumar avusherla (JIRA)
bharath kumar avusherla created SPARK-25052:
---

 Summary: Is there any possibility that spark structured streaming 
generate duplicates in the output?
 Key: SPARK-25052
 URL: https://issues.apache.org/jira/browse/SPARK-25052
 Project: Spark
  Issue Type: Question
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: bharath kumar avusherla


We recently observed that the spark structured streaming generated duplicates 
in the output when reading from Kafka topic and storing the output to the S3 
(and checkpointing in S3).  We ran into this issue twice. This is not 
reproducible. Is there anyone has ever faced this kind of issue before? Is this 
because of S3 eventual consistency?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24476) java.net.SocketTimeoutException: Read timed out under jets3t while running the Spark Structured Streaming

2018-06-15 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla resolved SPARK-24476.
-
Resolution: Fixed

> java.net.SocketTimeoutException: Read timed out under jets3t while running 
> the Spark Structured Streaming
> -
>
> Key: SPARK-24476
> URL: https://issues.apache.org/jira/browse/SPARK-24476
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Minor
> Attachments: socket-timeout-exception
>
>
> We are working on spark streaming application using spark structured 
> streaming with checkpointing in s3. When we start the application, the 
> application runs just fine for sometime  then it crashes with the error 
> mentioned below. The amount of time it will run successfully varies from time 
> to time, sometimes it will run for 2 days without any issues then crashes, 
> sometimes it will crash after 4hrs/ 24hrs. 
> Our streaming application joins(left and inner) multiple sources from kafka 
> and also s3 and aurora database.
> Can you please let us know how to solve this problem?
> Is it possible to somehow tweak the SocketTimeout-Time? 
> Here, I'm pasting the few line of complete exception log below. Also attached 
> the complete exception to the issue.
> *_Exception:_*
> *_Caused by: java.net.SocketTimeoutException: Read timed out_*
>         _at java.net.SocketInputStream.socketRead0(Native Method)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:150)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:121)_
>         _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_
>         _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_
>         _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_
>         _at 
> sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24476) java.net.SocketTimeoutException: Read timed out under jets3t while running the Spark Structured Streaming

2018-06-12 Thread bharath kumar avusherla (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509708#comment-16509708
 ] 

bharath kumar avusherla edited comment on SPARK-24476 at 6/12/18 2:48 PM:
--

[~ste...@apache.org] Our initial try was enable the speculative execution, 
reran the job and it is working fine. Then replaced s3n with s3a, reran the 
job. It is working fine too. Right now we are running two jobs simultaneously 
with these two different settings, nothing got failed. Do you recommend 
enabling speculative execution even after changing s3n from s3a?


was (Author: abharath9):
Our initial try was enable the speculative execution, reran the job and it is 
working fine. Then replaced s3n with s3a, reran the job. It is working fine 
too. Right now we are running two jobs simultaneously with these two different 
settings, nothing got failed. Do you recommend enabling speculative execution 
even after changing s3n from s3a?

> java.net.SocketTimeoutException: Read timed out under jets3t while running 
> the Spark Structured Streaming
> -
>
> Key: SPARK-24476
> URL: https://issues.apache.org/jira/browse/SPARK-24476
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Minor
> Attachments: socket-timeout-exception
>
>
> We are working on spark streaming application using spark structured 
> streaming with checkpointing in s3. When we start the application, the 
> application runs just fine for sometime  then it crashes with the error 
> mentioned below. The amount of time it will run successfully varies from time 
> to time, sometimes it will run for 2 days without any issues then crashes, 
> sometimes it will crash after 4hrs/ 24hrs. 
> Our streaming application joins(left and inner) multiple sources from kafka 
> and also s3 and aurora database.
> Can you please let us know how to solve this problem?
> Is it possible to somehow tweak the SocketTimeout-Time? 
> Here, I'm pasting the few line of complete exception log below. Also attached 
> the complete exception to the issue.
> *_Exception:_*
> *_Caused by: java.net.SocketTimeoutException: Read timed out_*
>         _at java.net.SocketInputStream.socketRead0(Native Method)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:150)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:121)_
>         _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_
>         _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_
>         _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_
>         _at 
> sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24476) java.net.SocketTimeoutException: Read timed out under jets3t while running the Spark Structured Streaming

2018-06-12 Thread bharath kumar avusherla (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509708#comment-16509708
 ] 

bharath kumar avusherla commented on SPARK-24476:
-

Our initial try was enable the speculative execution, reran the job and it is 
working fine. Then replaced s3n with s3a, reran the job. It is working fine 
too. Right now we are running two jobs simultaneously with these two different 
settings, nothing got failed. Do you recommend enabling speculative execution 
even after changing s3n from s3a?

> java.net.SocketTimeoutException: Read timed out under jets3t while running 
> the Spark Structured Streaming
> -
>
> Key: SPARK-24476
> URL: https://issues.apache.org/jira/browse/SPARK-24476
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Minor
> Attachments: socket-timeout-exception
>
>
> We are working on spark streaming application using spark structured 
> streaming with checkpointing in s3. When we start the application, the 
> application runs just fine for sometime  then it crashes with the error 
> mentioned below. The amount of time it will run successfully varies from time 
> to time, sometimes it will run for 2 days without any issues then crashes, 
> sometimes it will crash after 4hrs/ 24hrs. 
> Our streaming application joins(left and inner) multiple sources from kafka 
> and also s3 and aurora database.
> Can you please let us know how to solve this problem?
> Is it possible to somehow tweak the SocketTimeout-Time? 
> Here, I'm pasting the few line of complete exception log below. Also attached 
> the complete exception to the issue.
> *_Exception:_*
> *_Caused by: java.net.SocketTimeoutException: Read timed out_*
>         _at java.net.SocketInputStream.socketRead0(Native Method)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:150)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:121)_
>         _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_
>         _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_
>         _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_
>         _at 
> sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-06-10 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Description: 
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Used socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", ".spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 

  was:
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Used socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./ewe-spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 


> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", ".spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-06-10 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Description: 
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Used socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 

  was:
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Used socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", ".spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 


> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24476) java.net.SocketTimeoutException: Read timed out Exception while running the Spark Structured Streaming in 2.3.0

2018-06-06 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-24476:

Description: 
We are working on spark streaming application using spark structured streaming 
with checkpointing in s3. When we start the application, the application runs 
just fine for sometime  then it crashes with the error mentioned below. The 
amount of time it will run successfully varies from time to time, sometimes it 
will run for 2 days without any issues then crashes, sometimes it will crash 
after 4hrs/ 24hrs. 

Our streaming application joins(left and inner) multiple sources from kafka and 
also s3 and aurora database.

Can you please let us know how to solve this problem?
Is it possible to somehow tweak the SocketTimeout-Time? 
Here, I'm pasting the few line of complete exception log below. Also attached 
the complete exception to the issue.

*_Exception:_*

*_Caused by: java.net.SocketTimeoutException: Read timed out_*

        _at java.net.SocketInputStream.socketRead0(Native Method)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:150)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:121)_

        _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_

        _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_

        _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_

        _at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_

 

  was:
We are working on spark streaming application using spark structured streaming 
with checkpointing in s3. When we start the application, the application runs 
just fine for sometime  then it crashes with the error mentioned below. The 
amount of time it will run successfully varies from time to time, sometimes it 
will run for 2 days without any issues then crashes, sometimes it will crash 
after 4hrs/ 24hrs. 

Our streaming application joins(left and inner) multiple sources from kafka and 
also s3 and aurora database.

Can you please let us know how to solve this problem? Is it possible to 
increase the timeout period?

Here, I'm pasting the few line of complete exception log below. Also attached 
the complete exception to the issue.

*_Exception:_*

*_Caused by: java.net.SocketTimeoutException: Read timed out_*

        _at java.net.SocketInputStream.socketRead0(Native Method)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:150)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:121)_

        _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_

        _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_

        _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_

        _at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_

 


> java.net.SocketTimeoutException: Read timed out Exception while running the 
> Spark Structured Streaming in 2.3.0
> ---
>
> Key: SPARK-24476
> URL: https://issues.apache.org/jira/browse/SPARK-24476
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
> Attachments: socket-timeout-exception
>
>
> We are working on spark streaming application using spark structured 
> streaming with checkpointing in s3. When we start the application, the 
> application runs just fine for sometime  then it crashes with the error 
> mentioned below. The amount of time it will run successfully varies from time 
> to time, sometimes it will run for 2 days without any issues then crashes, 
> sometimes it will crash after 4hrs/ 24hrs. 
> Our streaming application joins(left and inner) multiple sources from kafka 
> and also s3 and aurora database.
> Can you please let us know how to solve this problem?
> Is it possible to somehow tweak the SocketTimeout-Time? 
> Here, I'm pasting the few line of 

[jira] [Updated] (SPARK-24476) java.net.SocketTimeoutException: Read timed out Exception while running the Spark Structured Streaming in 2.3.0

2018-06-06 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-24476:

Description: 
We are working on spark streaming application using spark structured streaming 
with checkpointing in s3. When we start the application, the application runs 
just fine for sometime  then it crashes with the error mentioned below. The 
amount of time it will run successfully varies from time to time, sometimes it 
will run for 2 days without any issues then crashes, sometimes it will crash 
after 4hrs/ 24hrs. 

Our streaming application joins(left and inner) multiple sources from kafka and 
also s3 and aurora database.

Can you please let us know how to solve this problem? Is it possible to 
increase the timeout period?

Here, I'm pasting the few line of complete exception log below. Also attached 
the complete exception to the issue.

*_Exception:_*

*_Caused by: java.net.SocketTimeoutException: Read timed out_*

        _at java.net.SocketInputStream.socketRead0(Native Method)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:150)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:121)_

        _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_

        _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_

        _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_

        _at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_

 

  was:
We are working on spark streaming application using spark structured streaming 
with checkpointing in s3. When we start the application, the application runs 
just fine for sometime  then it crashes with the error mentioned below. The 
amount of time it will run successfully varies from time to time, sometimes it 
will run for 2 days without any issues then crashes, sometimes it will crash 
after 4hrs/ 24hrs. 

Our streaming application joins(left and inner) multiple sources from kafka and 
also s3 and aurora database.

Can you please let us know how to solve this problem? Is it possible to 
increase the timeout period?

Here, I'm pasting the complete exception log below.

*_Exception:_*

*_Caused by: java.net.SocketTimeoutException: Read timed out_*

        _at java.net.SocketInputStream.socketRead0(Native Method)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:150)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:121)_

        _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_

        _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_

        _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_

        _at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_

        _at 
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:179)_

        _at 
org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144)_

        _at 
org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:134)_

        _at 
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:612)_

        _at 
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:447)_

        _at 
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)_

        _at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)_

        _at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2148)_

        _at 

[jira] [Updated] (SPARK-24476) java.net.SocketTimeoutException: Read timed out Exception while running the Spark Structured Streaming in 2.3.0

2018-06-06 Thread bharath kumar avusherla (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-24476:

Attachment: socket-timeout-exception

> java.net.SocketTimeoutException: Read timed out Exception while running the 
> Spark Structured Streaming in 2.3.0
> ---
>
> Key: SPARK-24476
> URL: https://issues.apache.org/jira/browse/SPARK-24476
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
> Attachments: socket-timeout-exception
>
>
> We are working on spark streaming application using spark structured 
> streaming with checkpointing in s3. When we start the application, the 
> application runs just fine for sometime  then it crashes with the error 
> mentioned below. The amount of time it will run successfully varies from time 
> to time, sometimes it will run for 2 days without any issues then crashes, 
> sometimes it will crash after 4hrs/ 24hrs. 
> Our streaming application joins(left and inner) multiple sources from kafka 
> and also s3 and aurora database.
> Can you please let us know how to solve this problem? Is it possible to 
> increase the timeout period?
> Here, I'm pasting the complete exception log below.
> *_Exception:_*
> *_Caused by: java.net.SocketTimeoutException: Read timed out_*
>         _at java.net.SocketInputStream.socketRead0(Native Method)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:150)_
>         _at java.net.SocketInputStream.read(SocketInputStream.java:121)_
>         _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_
>         _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_
>         _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_
>         _at 
> sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_
>         _at 
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_
>         _at 
> org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_
>         _at 
> org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:179)_
>         _at 
> org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144)_
>         _at 
> org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:134)_
>         _at 
> org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:612)_
>         _at 
> org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:447)_
>         _at 
> org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)_
>         _at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)_
>         _at 
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)_
>         _at 
> org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)_
>         _at 
> org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)_
>         _at 
> org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942)_
>         _at 
> org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2148)_
>         _at 
> org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2075)_
>         _at 
> org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1093)_
>         _at 
> org.jets3t.service.StorageService.getObjectDetails(StorageService.java:548)_
>         _at 
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)_
>         _at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)_
>         _at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_
>         _at java.lang.reflect.Method.invoke(Method.java:483)_
>         _at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)_
>         _at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)_
>         _at 
> org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)_
>         _at 
> 

[jira] [Created] (SPARK-24476) java.net.SocketTimeoutException: Read timed out Exception while running the Spark Structured Streaming in 2.3.0

2018-06-06 Thread bharath kumar avusherla (JIRA)
bharath kumar avusherla created SPARK-24476:
---

 Summary: java.net.SocketTimeoutException: Read timed out Exception 
while running the Spark Structured Streaming in 2.3.0
 Key: SPARK-24476
 URL: https://issues.apache.org/jira/browse/SPARK-24476
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: bharath kumar avusherla


We are working on spark streaming application using spark structured streaming 
with checkpointing in s3. When we start the application, the application runs 
just fine for sometime  then it crashes with the error mentioned below. The 
amount of time it will run successfully varies from time to time, sometimes it 
will run for 2 days without any issues then crashes, sometimes it will crash 
after 4hrs/ 24hrs. 

Our streaming application joins(left and inner) multiple sources from kafka and 
also s3 and aurora database.

Can you please let us know how to solve this problem? Is it possible to 
increase the timeout period?

Here, I'm pasting the complete exception log below.

*_Exception:_*

*_Caused by: java.net.SocketTimeoutException: Read timed out_*

        _at java.net.SocketInputStream.socketRead0(Native Method)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:150)_

        _at java.net.SocketInputStream.read(SocketInputStream.java:121)_

        _at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)_

        _at sun.security.ssl.InputRecord.read(InputRecord.java:503)_

        _at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:954)_

        _at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1343)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1371)_

        _at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1355)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)_

        _at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)_

        _at 
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:179)_

        _at 
org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144)_

        _at 
org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:134)_

        _at 
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:612)_

        _at 
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:447)_

        _at 
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)_

        _at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)_

        _at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:334)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:281)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:942)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2148)_

        _at 
org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2075)_

        _at 
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1093)_

        _at 
org.jets3t.service.StorageService.getObjectDetails(StorageService.java:548)_

        _at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)_

        _at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)_

        _at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)_

        _at java.lang.reflect.Method.invoke(Method.java:483)_

        _at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)_

        _at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)_

        _at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)_

        _at 
org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)_

        _at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)_

        _at org.apache.hadoop.fs.s3native.$Proxy18.retrieveMetadata(Unknown 
Source)_

        _at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:493)_

        _at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1437)_

        _at 

[jira] [Comment Edited] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-05-05 Thread bharath kumar avusherla (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464802#comment-16464802
 ] 

bharath kumar avusherla edited comment on SPARK-24189 at 5/5/18 3:40 PM:
-

[~mgaido] Yup. I have added above dependency and used following code to read 
the data.

val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test-topic")
 *.option("kafka.isolation.level","read_committed")*
 .load()

This time it didn't show the warning message. But simply not processing any 
records.

Not sure if there is any other option i need to set.


was (Author: abharath9):
Yup. I have added above dependency and used following code to read the data.

val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test-topic")
 *.option("kafka.isolation.level","read_committed")*
 .load()

This time it didn't show the warning message. But simply not processing any 
records. 

> Spark Strcutured Streaming not working with the Kafka Transactions
> --
>
> Key: SPARK-24189
> URL: https://issues.apache.org/jira/browse/SPARK-24189
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 
> 2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
> reading the data immediately without waiting for the data in topic to be 
> committed. In spark documentation it was mentioned as Structured Streaming 
> supports Kafka version 0.10 or higher. I am using below command to test the 
> scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 
> 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-05-05 Thread bharath kumar avusherla (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464802#comment-16464802
 ] 

bharath kumar avusherla commented on SPARK-24189:
-

Yup. I have added above dependency and used following code to read the data.

val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test-topic")
 *.option("kafka.isolation.level","read_committed")*
 .load()

This time it didn't show the warning message. But simply not processing any 
records. 

> Spark Strcutured Streaming not working with the Kafka Transactions
> --
>
> Key: SPARK-24189
> URL: https://issues.apache.org/jira/browse/SPARK-24189
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 
> 2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
> reading the data immediately without waiting for the data in topic to be 
> committed. In spark documentation it was mentioned as Structured Streaming 
> supports Kafka version 0.10 or higher. I am using below command to test the 
> scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 
> 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-05-05 Thread bharath kumar avusherla (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464795#comment-16464795
 ] 

bharath kumar avusherla commented on SPARK-24189:
-

[~mgaido] I'm using below maven dependency which is 0.10 kafka version.


 org.apache.spark
 spark-sql-kafka-0-10_2.11
 2.3.0


It was mentioned in the documentation it supports 0.10 or higher.

> Spark Strcutured Streaming not working with the Kafka Transactions
> --
>
> Key: SPARK-24189
> URL: https://issues.apache.org/jira/browse/SPARK-24189
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 
> 2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
> reading the data immediately without waiting for the data in topic to be 
> committed. In spark documentation it was mentioned as Structured Streaming 
> supports Kafka version 0.10 or higher. I am using below command to test the 
> scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 
> 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-05-05 Thread bharath kumar avusherla (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464781#comment-16464781
 ] 

bharath kumar avusherla edited comment on SPARK-24189 at 5/5/18 2:23 PM:
-

@[~mgaido]  I have tried both ways..

.option("kafka.isolation.level","read_committed") &

.option("kafka.isolation-level","read_committed"). But it is giving me the 
following warning and starts executing.
h5. *_WARN  ConsumerConfig:186 - The configuration isolation.level = 
read_committed was supplied but isn't a known config._* 


was (Author: abharath9):
I have tried both ways..

.option("kafka.isolation.level","read_committed") &

.option("kafka.isolation-level","read_committed"). But it is giving me the 
following warning and starts executing.
h5. *_WARN  ConsumerConfig:186 - The configuration isolation.level = 
read_committed was supplied but isn't a known config._* 

> Spark Strcutured Streaming not working with the Kafka Transactions
> --
>
> Key: SPARK-24189
> URL: https://issues.apache.org/jira/browse/SPARK-24189
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 
> 2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
> reading the data immediately without waiting for the data in topic to be 
> committed. In spark documentation it was mentioned as Structured Streaming 
> supports Kafka version 0.10 or higher. I am using below command to test the 
> scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 
> 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-05-05 Thread bharath kumar avusherla (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-24189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16464781#comment-16464781
 ] 

bharath kumar avusherla commented on SPARK-24189:
-

I have tried both ways..

.option("kafka.isolation.level","read_committed") &

.option("kafka.isolation-level","read_committed"). But it is giving me the 
following warning and starts executing.
h5. *_WARN  ConsumerConfig:186 - The configuration isolation.level = 
read_committed was supplied but isn't a known config._* 

> Spark Strcutured Streaming not working with the Kafka Transactions
> --
>
> Key: SPARK-24189
> URL: https://issues.apache.org/jira/browse/SPARK-24189
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Was trying to read kafka transactional topic using Spark Structured Streaming 
> 2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
> reading the data immediately without waiting for the data in topic to be 
> committed. In spark documentation it was mentioned as Structured Streaming 
> supports Kafka version 0.10 or higher. I am using below command to test the 
> scenario.
> val df = spark
>  .readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test-topic")
>  .option("isolation-level","read_committed")
>  .load()
> Can you please let me know if the transactional read is supported in SPark 
> 2.3.0 strcutured Streaming or am i missing anything.
>  
> Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24189) Spark Strcutured Streaming not working with the Kafka Transactions

2018-05-04 Thread bharath kumar avusherla (JIRA)
bharath kumar avusherla created SPARK-24189:
---

 Summary: Spark Strcutured Streaming not working with the Kafka 
Transactions
 Key: SPARK-24189
 URL: https://issues.apache.org/jira/browse/SPARK-24189
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: bharath kumar avusherla


Was trying to read kafka transactional topic using Spark Structured Streaming 
2.3.0 with the  kafka option isolation-level = "read_committed", but spark 
reading the data immediately without waiting for the data in topic to be 
committed. In spark documentation it was mentioned as Structured Streaming 
supports Kafka version 0.10 or higher. I am using below command to test the 
scenario.

val df = spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test-topic")
 .option("isolation-level","read_committed")
 .load()

Can you please let me know if the transactional read is supported in SPark 
2.3.0 strcutured Streaming or am i missing anything.

 

Thank you.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-10 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla closed SPARK-23869.
---

> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla resolved SPARK-23869.
-
Resolution: Invalid

> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Comment: was deleted

(was: Hey, 

Any luck on this? Am I missing anything?)

> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Issue Type: Bug  (was: Test)

> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Description: 
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Used socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./ewe-spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 

  was:
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Using socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./ewe-spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 


> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Test
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Used socketstream to test 
> this. In our case we want to emit the records with null values which doesn't 
> match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426417#comment-16426417
 ] 

bharath kumar avusherla commented on SPARK-23869:
-

Hey, 

Any luck on this? Am I missing anything?

> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Test
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Using socketstream to 
> test this. In our case we want to emit the records with null values which 
> doesn't match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Description: 
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Using socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id or/and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./ewe-spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 

  was:
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Using socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./ewe-spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 


> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Test
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Using socketstream to 
> test this. In our case we want to emit the records with null values which 
> doesn't match with id or/and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Issue Type: Test  (was: Bug)

> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Test
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Using socketstream to 
> test this. In our case we want to emit the records with null values which 
> doesn't match with id and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Priority: Major  (was: Blocker)

> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Major
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Using socketstream to 
> test this. In our case we want to emit the records with null values which 
> doesn't match with id and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23869:

Description: 
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Using socketstream to test 
this. In our case we want to emit the records with null values which doesn't 
match with id and not fall in time range condition

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
 .writeStream

.outputMode(OutputMode.Append)
 .format("console")
 .option("checkpointLocation", "./ewe-spark-checkpoints/")
 .start()

query.awaitTermination()

Thank you. 

 

  was:
Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Using socketstream to test 
this.

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
.writeStream

.outputMode(OutputMode.Append)
.format("console")
.option("checkpointLocation", "./ewe-spark-checkpoints/")
.start()

 

query.awaitTermination()

 

 


> Spark 2.3.0 left outer join not emitting null values instead waiting for the 
> record in other stream
> ---
>
> Key: SPARK-23869
> URL: https://issues.apache.org/jira/browse/SPARK-23869
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: bharath kumar avusherla
>Priority: Blocker
>
> Left outer join on two streams not emitting the null outputs. It is just 
> waiting for the record to be added to other stream. Using socketstream to 
> test this. In our case we want to emit the records with null values which 
> doesn't match with id and not fall in time range condition
> Details of the watermarks and intervals are:
> val ds1Map = ds1
>  .selectExpr("Id AS ds1_Id", "ds1_timestamp")
>  .withWatermark("ds1_timestamp","10 seconds")
> val ds2Map = ds2
>  .selectExpr("Id AS ds2_Id", "ds2_timestamp")
>  .withWatermark("ds2_timestamp", "20 seconds")
> val output = ds1Map.join( ds2Map,
>  expr(
>  """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
> ds1_timestamp + interval 1 minutes """),
>  "leftOuter")
> val query = output.select("*")
>  .writeStream
> .outputMode(OutputMode.Append)
>  .format("console")
>  .option("checkpointLocation", "./ewe-spark-checkpoints/")
>  .start()
> query.awaitTermination()
> Thank you. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23869) Spark 2.3.0 left outer join not emitting null values instead waiting for the record in other stream

2018-04-04 Thread bharath kumar avusherla (JIRA)
bharath kumar avusherla created SPARK-23869:
---

 Summary: Spark 2.3.0 left outer join not emitting null values 
instead waiting for the record in other stream
 Key: SPARK-23869
 URL: https://issues.apache.org/jira/browse/SPARK-23869
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.3.0
Reporter: bharath kumar avusherla


Left outer join on two streams not emitting the null outputs. It is just 
waiting for the record to be added to other stream. Using socketstream to test 
this.

Details of the watermarks and intervals are:

val ds1Map = ds1
 .selectExpr("Id AS ds1_Id", "ds1_timestamp")
 .withWatermark("ds1_timestamp","10 seconds")

val ds2Map = ds2
 .selectExpr("Id AS ds2_Id", "ds2_timestamp")
 .withWatermark("ds2_timestamp", "20 seconds")

val output = ds1Map.join( ds2Map,
 expr(
 """ ds1_Id = ds2_Id AND ds2_timestamp >= ds1_timestamp AND  ds2_timestamp <= 
ds1_timestamp + interval 1 minutes """),
 "leftOuter")

val query = output.select("*")
.writeStream

.outputMode(OutputMode.Append)
.format("console")
.option("checkpointLocation", "./ewe-spark-checkpoints/")
.start()

 

query.awaitTermination()

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23814) Couldn't read file with colon in name and new line character in one of the field.

2018-03-28 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23814:

Description: 
When the file name has colon and new line character in data, while reading 
using spark.read.option("multiLine","true").csv("s3n://Directory/") function. 
It is throwing *"**java.lang.IllegalArgumentException: 
java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz"* error. If we remove the 
option("multiLine","true"), it is working just fine though the file name has 
colon in it. It is working fine, If i apply this option 
*option("multiLine","true")* on any other file which doesn't have colon in it. 
But when both are present (colon in file name and new line in the data), it's 
not working.
{quote}java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: 2017-08-01T00:00:00Z.csv.gz

  at org.apache.hadoop.fs.Path.initialize(Path.java:205)

  at org.apache.hadoop.fs.Path.(Path.java:171)

  at org.apache.hadoop.fs.Path.(Path.java:93)

  at org.apache.hadoop.fs.Globber.glob(Globber.java:253)

  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:294)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)

  at 
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:51)

  at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:46)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

  at 
org.apache.spark.sql.execution.datasources.csv.MultiLineCSVDataSource$.infer(CSVDataSource.scala:224)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:62)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at scala.Option.orElse(Option.scala:289)

  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)

  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)

  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)

  ... 48 elided

Caused by: java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz

  at java.net.URI.checkPath(URI.java:1823)

  at java.net.URI.(URI.java:745)

  at org.apache.hadoop.fs.Path.initialize(Path.java:202)

  ... 86 more
{quote}

  was:
When the file name has colon and new line character in data, while reading 
using spark.read.option("multiLine","true").csv("sn://Directory/") function. It 
is throwing *"**java.lang.IllegalArgumentException: 
java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz"* error. If we remove the 
option("multiLine","true"), it is working just fine though the file name has 
colon in it. It is working fine, If i apply this option 
*option("multiLine","true")* on any other file which doesn't have colon in it. 
But when both are present (colon in file name and new line in the data), it's 
not working.
{quote}java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: 2017-08-01T00:00:00Z.csv.gz

  at 

[jira] [Updated] (SPARK-23814) Couldn't read file with colon in name and new line character in one of the field.

2018-03-28 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23814:

Description: 
When the file name has colon and new line character in data, while reading 
using spark.read.option("multiLine","true").csv("s3n://DirectoryPath/") 
function. It is throwing *"**java.lang.IllegalArgumentException: 
java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz"* error. If we remove the 
option("multiLine","true"), it is working just fine though the file name has 
colon in it. It is working fine, If i apply this option 
*option("multiLine","true")* on any other file which doesn't have colon in it. 
But when both are present (colon in file name and new line in the data), it's 
not working.
{quote}java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: 2017-08-01T00:00:00Z.csv.gz

  at org.apache.hadoop.fs.Path.initialize(Path.java:205)

  at org.apache.hadoop.fs.Path.(Path.java:171)

  at org.apache.hadoop.fs.Path.(Path.java:93)

  at org.apache.hadoop.fs.Globber.glob(Globber.java:253)

  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:294)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)

  at 
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:51)

  at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:46)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

  at 
org.apache.spark.sql.execution.datasources.csv.MultiLineCSVDataSource$.infer(CSVDataSource.scala:224)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:62)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at scala.Option.orElse(Option.scala:289)

  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)

  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)

  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)

  ... 48 elided

Caused by: java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz

  at java.net.URI.checkPath(URI.java:1823)

  at java.net.URI.(URI.java:745)

  at org.apache.hadoop.fs.Path.initialize(Path.java:202)

  ... 86 more
{quote}

  was:
When the file name has colon and new line character in data, while reading 
using spark.read.option("multiLine","true").csv("s3n://Directory/") function. 
It is throwing *"**java.lang.IllegalArgumentException: 
java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz"* error. If we remove the 
option("multiLine","true"), it is working just fine though the file name has 
colon in it. It is working fine, If i apply this option 
*option("multiLine","true")* on any other file which doesn't have colon in it. 
But when both are present (colon in file name and new line in the data), it's 
not working.
{quote}java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: 2017-08-01T00:00:00Z.csv.gz

 

[jira] [Updated] (SPARK-23814) Couldn't read file with colon in name and new line character in one of the field.

2018-03-28 Thread bharath kumar avusherla (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bharath kumar avusherla updated SPARK-23814:

Description: 
When the file name has colon and new line character in data, while reading 
using spark.read.option("multiLine","true").csv("sn://Directory/") function. It 
is throwing *"**java.lang.IllegalArgumentException: 
java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz"* error. If we remove the 
option("multiLine","true"), it is working just fine though the file name has 
colon in it. It is working fine, If i apply this option 
*option("multiLine","true")* on any other file which doesn't have colon in it. 
But when both are present (colon in file name and new line in the data), it's 
not working.
{quote}java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: 2017-08-01T00:00:00Z.csv.gz

  at org.apache.hadoop.fs.Path.initialize(Path.java:205)

  at org.apache.hadoop.fs.Path.(Path.java:171)

  at org.apache.hadoop.fs.Path.(Path.java:93)

  at org.apache.hadoop.fs.Globber.glob(Globber.java:253)

  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:294)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)

  at 
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:51)

  at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:46)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

  at 
org.apache.spark.sql.execution.datasources.csv.MultiLineCSVDataSource$.infer(CSVDataSource.scala:224)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:62)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at scala.Option.orElse(Option.scala:289)

  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)

  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)

  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)

  ... 48 elided

Caused by: java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz

  at java.net.URI.checkPath(URI.java:1823)

  at java.net.URI.(URI.java:745)

  at org.apache.hadoop.fs.Path.initialize(Path.java:202)

  ... 86 more
{quote}

  was:
When the file name has colon and new line character in data, while reading 
using spark.read.option("multiLine","true").csv("sn://Directory/") function. It 
is throwing *"**java.lang.IllegalArgumentException: 
java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz"* error. If we remove the 
option("multiLine","true"), it is working just fine though the file name has 
colon in it. It is working fine, If i apply this option 
*option("multiLine","true")* on any other file which doesn't contain colon in 
it. But when both are present (colon in file name and new line in the data), it 
snot working.
{quote}java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: 2017-08-01T00:00:00Z.csv.gz

  at 

[jira] [Created] (SPARK-23814) Couldn't read file with colon in name and new line character in one of the field.

2018-03-28 Thread bharath kumar avusherla (JIRA)
bharath kumar avusherla created SPARK-23814:
---

 Summary: Couldn't read file with colon in name and new line 
character in one of the field.
 Key: SPARK-23814
 URL: https://issues.apache.org/jira/browse/SPARK-23814
 Project: Spark
  Issue Type: Bug
  Components: Spark Core, Spark Shell
Affects Versions: 2.2.0
Reporter: bharath kumar avusherla


When the file name has colon and new line character in data, while reading 
using spark.read.option("multiLine","true").csv("sn://Directory/") function. It 
is throwing *"**java.lang.IllegalArgumentException: 
java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz"* error. If we remove the 
option("multiLine","true"), it is working just fine though the file name has 
colon in it. It is working fine, If i apply this option 
*option("multiLine","true")* on any other file which doesn't contain colon in 
it. But when both are present (colon in file name and new line in the data), it 
snot working.
{quote}java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: 2017-08-01T00:00:00Z.csv.gz

  at org.apache.hadoop.fs.Path.initialize(Path.java:205)

  at org.apache.hadoop.fs.Path.(Path.java:171)

  at org.apache.hadoop.fs.Path.(Path.java:93)

  at org.apache.hadoop.fs.Globber.glob(Globber.java:253)

  at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:294)

  at 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)

  at 
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:51)

  at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:46)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

  at scala.Option.getOrElse(Option.scala:121)

  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)

  at 
org.apache.spark.sql.execution.datasources.csv.MultiLineCSVDataSource$.infer(CSVDataSource.scala:224)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:62)

  at 
org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:57)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)

  at scala.Option.orElse(Option.scala:289)

  at 
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)

  at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)

  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)

  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:412)

  ... 48 elided

Caused by: java.net.URISyntaxException: Relative path in absolute URI: 
2017-08-01T00:00:00Z.csv.gz

  at java.net.URI.checkPath(URI.java:1823)

  at java.net.URI.(URI.java:745)

  at org.apache.hadoop.fs.Path.initialize(Path.java:202)

  ... 86 more
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org