Re: Remove subsets from FP Growth output

2020-12-02 Thread Sean Owen
-dev

Increase the threshold? Just filter the rules as desired after they are
generated?
It's not clear what your criteria are.

On Wed, Dec 2, 2020 at 7:30 AM Aditya Addepalli  wrote:

> Hi,
>
> Is there a good way to remove all the subsets of patterns from the output
> given by FP Growth?
>
> For example if both the patterns pass the confidence and support
> thresholds:
>
> [Attribute1 = A, Attribute2 = B] -> [Output=C]
> [Attribute1 = A] -> [Output=C]
>
> I want to choose only [Attribute1 = A] -> [Output=C]
>
> Any ideas that come to mind would be appreciated.
>


Re: Structured Streaming Checkpoint Error

2020-12-02 Thread German Schiavon
Hello!

@Gabor Somogyi   I wonder that now that s3
is *strongly
consistent* , would work fine.


Regards!
https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/

On Thu, 17 Sep 2020 at 11:55, German Schiavon 
wrote:

> Hi Gabor,
>
> Makes sense, thanks a lot!
>
> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
> wrote:
>
>> Hi,
>>
>> Structured Streaming is simply not working when checkpoint location is on
>> S3 due to it's read-after-write consistency.
>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>
>> BR,
>> G
>>
>>
>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon 
>> wrote:
>>
>>> Hi!
>>>
>>> I have an Structured Streaming Application that reads from kafka,
>>> performs some aggregations and writes in S3 in parquet format.
>>>
>>> Everything seems to work great except that from time to time I get a
>>> checkpoint error, at the beginning I thought it was a random error but it
>>> happened more than 3 times already in a few days
>>>
>>> Caused by: java.io.FileNotFoundException: No such file or directory:
>>> s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>
>>>
>>> Does this happen to anyone else?
>>>
>>> Thanks in advance.
>>>
>>> *This is the full error :*
>>>
>>> ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
>>> 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
>>> 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error
>>>
>>> java.io.FileNotFoundException: No such file or directory:
>>> s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)
>>>
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)
>>>
>>> at
>>> org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)
>>>
>>> at
>>> org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)
>>>
>>> at
>>> org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)
>>>
>>> at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)
>>>
>>> at
>>> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)
>>>
>>> at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
>>> at scala.Option.getOrElse(Option.scala:189)
>>>
>>


Regexp_extract not giving correct output

2020-12-02 Thread Sachit Murarka
Hi All,

I am using Pyspark to get the value from a column on basis of regex.

Following is the regex which I am using:
(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)

df = spark.createDataFrame([("[1234] [] [] [66]",),
("abcd",)],["stringValue"])

result = df.withColumn('extracted value',
F.regexp_extract(F.col('stringValue'),
'(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',
1))

I have tried with spark.sql as well. It is giving empty output.

I have tested this regex , it is working fine on an online regextester .
But it is not working in spark . I know spark needs Java based regex ,
hence I tried escaping also , that gave exception:
: java.util.regex.PatternSyntaxException: Unknown inline modifier near
index 21
(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)


Can you please help here?

Kind Regards,
Sachit Murarka


Re: Regexp_extract not giving correct output

2020-12-02 Thread Sean Owen
As in Java/Scala, in Python you'll need to escape the backslashes with \\.
"\[" means just "[" in a string. I think you could also prefix the string
literal with 'r' to disable Python's handling of escapes.

On Wed, Dec 2, 2020 at 9:34 AM Sachit Murarka 
wrote:

> Hi All,
>
> I am using Pyspark to get the value from a column on basis of regex.
>
> Following is the regex which I am using:
>
> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>
> df = spark.createDataFrame([("[1234] [] [] [66]",),
> ("abcd",)],["stringValue"])
>
> result = df.withColumn('extracted value',
> F.regexp_extract(F.col('stringValue'),
> '(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',
> 1))
>
> I have tried with spark.sql as well. It is giving empty output.
>
> I have tested this regex , it is working fine on an online regextester .
> But it is not working in spark . I know spark needs Java based regex ,
> hence I tried escaping also , that gave exception:
> : java.util.regex.PatternSyntaxException: Unknown inline modifier near
> index 21
>
> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>
>
> Can you please help here?
>
> Kind Regards,
> Sachit Murarka
>


Re: Regexp_extract not giving correct output

2020-12-02 Thread Sachit Murarka
Hi Sean,

Thanks for quick response!

I have tried with string literal 'r' as a prefix that also gave an empty
result..
spark.sql(r"select regexp_extract('[11] [22]
[33]','(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',1)
as anyid").show()

and as I mentioned when I am using 2 backslashes it is giving an exception
as follows:
  : java.util.regex.PatternSyntaxException: Unknown inline modifier near
index 21
(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)

Kind Regards,
Sachit Murarka


On Wed, Dec 2, 2020 at 9:07 PM Sean Owen  wrote:

> As in Java/Scala, in Python you'll need to escape the backslashes with \\.
> "\[" means just "[" in a string. I think you could also prefix the string
> literal with 'r' to disable Python's handling of escapes.
>
> On Wed, Dec 2, 2020 at 9:34 AM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am using Pyspark to get the value from a column on basis of regex.
>>
>> Following is the regex which I am using:
>>
>> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>>
>> df = spark.createDataFrame([("[1234] [] [] [66]",),
>> ("abcd",)],["stringValue"])
>>
>> result = df.withColumn('extracted value',
>> F.regexp_extract(F.col('stringValue'),
>> '(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',
>> 1))
>>
>> I have tried with spark.sql as well. It is giving empty output.
>>
>> I have tested this regex , it is working fine on an online regextester .
>> But it is not working in spark . I know spark needs Java based regex ,
>> hence I tried escaping also , that gave exception:
>> : java.util.regex.PatternSyntaxException: Unknown inline modifier near
>> index 21
>>
>> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>>
>>
>> Can you please help here?
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Re: Regexp_extract not giving correct output

2020-12-02 Thread Sean Owen
This means there is something wrong with your regex vs what Java supports.
Do you mean "(?:" rather than "(?" around where the error is? This is not
related to Spark.

On Wed, Dec 2, 2020 at 9:45 AM Sachit Murarka 
wrote:

> Hi Sean,
>
> Thanks for quick response!
>
> I have tried with string literal 'r' as a prefix that also gave an empty
> result..
> spark.sql(r"select regexp_extract('[11] [22]
> [33]','(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',1)
> as anyid").show()
>
> and as I mentioned when I am using 2 backslashes it is giving an exception
> as follows:
>   : java.util.regex.PatternSyntaxException: Unknown inline modifier near
> index 21
>
> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>
> Kind Regards,
> Sachit Murarka
>
>
> On Wed, Dec 2, 2020 at 9:07 PM Sean Owen  wrote:
>
>> As in Java/Scala, in Python you'll need to escape the backslashes with
>> \\. "\[" means just "[" in a string. I think you could also prefix the
>> string literal with 'r' to disable Python's handling of escapes.
>>
>> On Wed, Dec 2, 2020 at 9:34 AM Sachit Murarka 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am using Pyspark to get the value from a column on basis of regex.
>>>
>>> Following is the regex which I am using:
>>>
>>> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>>>
>>> df = spark.createDataFrame([("[1234] [] [] [66]",),
>>> ("abcd",)],["stringValue"])
>>>
>>> result = df.withColumn('extracted value',
>>> F.regexp_extract(F.col('stringValue'),
>>> '(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',
>>> 1))
>>>
>>> I have tried with spark.sql as well. It is giving empty output.
>>>
>>> I have tested this regex , it is working fine on an online regextester .
>>> But it is not working in spark . I know spark needs Java based regex ,
>>> hence I tried escaping also , that gave exception:
>>> : java.util.regex.PatternSyntaxException: Unknown inline modifier near
>>> index 21
>>>
>>> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>>>
>>>
>>> Can you please help here?
>>>
>>> Kind Regards,
>>> Sachit Murarka
>>>
>>


Spark UI Storage Memory

2020-12-02 Thread Amit Sharma
Hi , I have a spark streaming job. When I am checking the Excetors tab ,
there is a Storage Memory column. It displays used memory  /total memory.
What is used memory. Is it memory in  use or memory used so far. How would
I know how much memory is unused at 1 point of time.


Thanks
Amit


Spark ML / ALS question

2020-12-02 Thread Steve Pruitt
I am having a little difficulty finding information on the ALS train(…) method 
in spark.ml.  Its unclear when to use it.  In the java doc, the parameters are 
undocumented.

What is difference between train(..) and fit(..).  When would do you use one or 
the other?


-S



Re: Spark ML / ALS question

2020-12-02 Thread Sean Owen
There is only a fit() method in spark.ml's ALS
http://spark.apache.org/docs/latest/api/scala/org/apache/spark/ml/recommendation/ALS.html

The older spark.mllib interface has a train() method. You'd generally use
the spark.ml version.

On Wed, Dec 2, 2020 at 2:13 PM Steve Pruitt 
wrote:

> I am having a little difficulty finding information on the ALS train(…)
> method in spark.ml.  Its unclear when to use it.  In the java doc, the
> parameters are undocumented.
>
> What is difference between train(..) and fit(..).  When would do you use
> one or the other?
>
>
> -S
>
>


In windows 10, accessing Hive from PySpark with PyCharm throws error

2020-12-02 Thread Mich Talebzadeh
Hi,

I have a simple code that tries to create Hive derby database as follows:

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf, col, max as max, to_date, date_add, \
add_months
from datetime import datetime, timedelta
import os
from os.path import join, abspath
from typing import Optional
import logging
import random
import string
import math

warehouseLocation =
'c:\\Users\\admin\\PycharmProjects\\pythonProject\\spark-warehouse'
local_scrtatchdir =
'c:\\Users\\admin\\PycharmProjects\\pythonProject\\hive-localscratchdir'
scrtatchdir = 
'c:\\Users\\admin\\PycharmProjects\\pythonProject\\hive-scratchdir'
tmp_dir = 'd:\\temp\\hive'
metastore_db = 
'jdbc:derby:C:\\Users\\admin\\PycharmProjects\\pythonProject\\metastore_db;create=true'
ConnectionDriverName = 'org.apache.derby.EmbeddedDriver'
spark = SparkSession \
.builder \
.appName("App1") \
.config("hive.exec.local.scratchdir", local_scrtatchdir) \
.config("hive.exec.scratchdir", scrtatchdir) \
.config("spark.sql.warehouse.dir", warehouseLocation) \
.config("hadoop.tmp.dir", tmp_dir) \
.config("javax.jdo.option.ConnectionURL", metastore_db ) \
.config("javax.jdo.option.ConnectionDriverName", ConnectionDriverName) \
.enableHiveSupport() \
.getOrCreate()
print(os.listdir(warehouseLocation))
print(os.listdir(local_scrtatchdir))
print(os.listdir(scrtatchdir))
print(os.listdir(tmp_dir))
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
HiveContext = HiveContext(sc)
spark.sql("CREATE DATABASE IF NOT EXISTS test")


Now this comes back with the following:


C:\Users\admin\PycharmProjects\pythonProject\venv\Scripts\python.exe
C:/Users/admin/PycharmProjects/pythonProject/main.py

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).

[]

[]

[]

['hive-localscratchdir', 'hive-scratchdir', 'hive-warehouse']

Traceback (most recent call last):

  File "C:/Users/admin/PycharmProjects/pythonProject/main.py", line 76, in


spark.sql("CREATE DATABASE IF NOT EXISTS test")

  File "D:\temp\spark\python\pyspark\sql\session.py", line 649, in sql

return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)

  File "D:\temp\spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py",
line 1305, in __call__

  File "D:\temp\spark\python\pyspark\sql\utils.py", line 134, in deco

raise_from(converted)

  File "", line 3, in raise_from

*pyspark.sql.utils.AnalysisException: java.lang.UnsatisfiedLinkError:
org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V;*


Process finished with exit code 1

Also under %SPARK_HOME%/conf I also have hive-site.xml file. It is not
obvious to me why it is throwing this error?

Thanks


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: In windows 10, accessing Hive from PySpark with PyCharm throws error

2020-12-02 Thread Artemis User
Apparently this is a OS dynamic lib link error.  Make sure you have the 
LD_LIBRARY_PATH (in Linux) or PATH (windows) set up properly for the 
right .so or .dll file...


On 12/2/20 5:31 PM, Mich Talebzadeh wrote:

Hi,

I have a simple code that tries to create Hive derby database as follows:

from pysparkimport SparkContext
from pyspark.sqlimport SQLContext
from pyspark.sqlimport HiveContext
from pyspark.sqlimport SparkSession
from pyspark.sqlimport Row
from pyspark.sql.typesimport StringType, ArrayType
from pyspark.sql.functionsimport udf, col, maxas max, to_date, date_add, \
 add_months
from datetimeimport datetime, timedelta
import os
from os.pathimport join, abspath
from typingimport Optional
import logging
import random
import string
import math
warehouseLocation ='c:\\Users\\admin\\PycharmProjects\\pythonProject\\spark-warehouse' local_scrtatchdir ='c:\\Users\\admin\\PycharmProjects\\pythonProject\\hive-localscratchdir' scrtatchdir ='c:\\Users\\admin\\PycharmProjects\\pythonProject\\hive-scratchdir' tmp_dir ='d:\\temp\\hive' metastore_db ='jdbc:derby:C:\\Users\\admin\\PycharmProjects\\pythonProject\\metastore_db;create=true' 
ConnectionDriverName ='org.apache.derby.EmbeddedDriver' spark = SparkSession \

 .builder \
 .appName("App1") \
 .config("hive.exec.local.scratchdir", local_scrtatchdir) \
 .config("hive.exec.scratchdir", scrtatchdir) \
 .config("spark.sql.warehouse.dir", warehouseLocation) \
 .config("hadoop.tmp.dir", tmp_dir) \
 .config("javax.jdo.option.ConnectionURL", metastore_db ) \
 .config("javax.jdo.option.ConnectionDriverName", ConnectionDriverName) \
 .enableHiveSupport() \
 .getOrCreate()
print(os.listdir(warehouseLocation))
print(os.listdir(local_scrtatchdir))
print(os.listdir(scrtatchdir))
print(os.listdir(tmp_dir))
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
HiveContext = HiveContext(sc)
spark.sql("CREATE DATABASE IF NOT EXISTS test")

Now this comes back with the following:


C:\Users\admin\PycharmProjects\pythonProject\venv\Scripts\python.exe 
C:/Users/admin/PycharmProjects/pythonProject/main.py


Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties


Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).


[]

[]

[]

['hive-localscratchdir', 'hive-scratchdir', 'hive-warehouse']

Traceback (most recent call last):

  File "C:/Users/admin/PycharmProjects/pythonProject/main.py", line 
76, in 


    spark.sql("CREATE DATABASE IF NOT EXISTS test")

  File "D:\temp\spark\python\pyspark\sql\session.py", line 649, in sql

    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)

  File 
"D:\temp\spark\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py", 
line 1305, in __call__


  File "D:\temp\spark\python\pyspark\sql\utils.py", line 134, in deco

raise_from(converted)

  File "", line 3, in raise_from

*pyspark.sql.utils.AnalysisException: java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.io.nativeio.NativeIO$Windows.createDirectoryWithMode0(Ljava/lang/String;I)V;*



Process finished with exit code 1


Also under %SPARK_HOME%/conf I also have hive-site.xml file. It is not 
obvious to me why it is throwing this error?


Thanks


LinkedIn 
/https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw 
/




*Disclaimer:* Use it at your own risk.Any and all responsibility for 
any loss, damage or destruction of data or any other property which 
may arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




Re: Structured Streaming Checkpoint Error

2020-12-02 Thread Jungtaek Lim
In theory it would work, but works very inefficiently on checkpointing. If
I understand correctly, it will write the content to the temp file on s3,
and rename the file which actually gets the temp file from s3 and write the
content of temp file to the final path on s3. Compared to checkpoint with
HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom
implementation of checkpoint manager on S3.

Also atomic rename is still not working for S3, as well as S3 doesn't
support write with overwrite=false. That said, there's no barrier if
concurrent streaming queries access to the same checkpoint and mess up.
With checkpoint in HDFS, the rename is atomic and only one succeeds even in
parallel and the other query lost writing to the checkpoint file simply
fails. That's a caveat you may want to keep in mind.

On Wed, Dec 2, 2020 at 11:35 PM German Schiavon 
wrote:

> Hello!
>
> @Gabor Somogyi   I wonder that now that s3 is 
> *strongly
> consistent* , would work fine.
>
>
> Regards!
>
> https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/
>
> On Thu, 17 Sep 2020 at 11:55, German Schiavon 
> wrote:
>
>> Hi Gabor,
>>
>> Makes sense, thanks a lot!
>>
>> On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi 
>> wrote:
>>
>>> Hi,
>>>
>>> Structured Streaming is simply not working when checkpoint location is
>>> on S3 due to it's read-after-write consistency.
>>> Please choose an HDFS compliant filesystem and it will work like a charm.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <
>>> gschiavonsp...@gmail.com> wrote:
>>>
 Hi!

 I have an Structured Streaming Application that reads from kafka,
 performs some aggregations and writes in S3 in parquet format.

 Everything seems to work great except that from time to time I get a
 checkpoint error, at the beginning I thought it was a random error but it
 happened more than 3 times already in a few days

 Caused by: java.io.FileNotFoundException: No such file or directory:
 s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


 Does this happen to anyone else?

 Thanks in advance.

 *This is the full error :*

 ERROR streaming.MicroBatchExecution: Query segmentValidation [id =
 14edaddf-25bb-4259-b7a2-6107907f962f, runId =
 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

 java.io.FileNotFoundException: No such file or directory:
 s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

 at
 org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

 at
 org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

 at
 org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

 at
 org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

 at
 org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

 at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

 at
 org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

 at
 org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

 at
 org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

 at
 org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

 at
 scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
 at scala.Option.getOrElse(Option.scala:189)

>>>


Why NPE happen with multi threading in cluster mode but not client model

2020-12-02 Thread lk_spark
hi,all :
   I'm using spark2.4, I try to use multi thread to use sparkcontext , I found 
a example : 
https://hadoopist.wordpress.com/2017/02/03/how-to-use-threads-in-spark-job-to-achieve-parallel-read-and-writes/
   some code like this :
   for (a <- 0 until 4) {
  val thread = new Thread {
override def run() {
  sc.parallelize(Array("ddd", "eee", 
"fff")).toDF().write.format("parquet").mode("overWrite").save("/tmp/vgiri/file2")
   .
   
   when I run the code in local or client model , it will be work. But In 
cluster model I meet the ERROR: SPARK-29046 
https://issues.apache.org/jira/browse/SPARK-29046
   finally , I use java.util.concurrent.Semaphore to wait all the sub thread to 
be finished before the main thread stop the sparkcontext , then the code worked 
in cluster model.


   But I don't understand why In local or client model ,even I didn't use a 
Semaphore, the code can also work ?