Re: use java in Grouped Map pandas udf to avoid serDe

2020-10-06 Thread Lian Jiang
Hi,

I used these settings but did not see obvious improvement (190 minutes
reduced to 170 minutes):

spark.sql.execution.arrow.pyspark.enabled: True
spark.sql.execution.arrow.pyspark.fallback.enabled: True

This job heavily uses pandas udfs and it runs on a 30 xlarge node emr.
Any idea why the perf improvement is small after enabling arrow?
Anything else could be missing? Thanks.


On Sun, Oct 4, 2020 at 10:36 AM Lian Jiang  wrote:

> Please ignore this question.
> https://kontext.tech/column/spark/370/improve-pyspark-performance-using-pandas-udf-with-apache-arrow
> shows pandas udf should have avoided jvm<->Python SerDe by maintaining one
> data copy in memory. spark.sql.execution.arrow.enabled is false by default.
> I think I missed enabling spark.sql.execution.arrow.enabled. Thanks.
> Regards.
>
> On Sun, Oct 4, 2020 at 10:22 AM Lian Jiang  wrote:
>
>> Hi,
>>
>> I am using pyspark Grouped Map pandas UDF (
>> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html).
>> Functionality wise it works great. However, serDe causes a lot of perf
>> hits. To optimize this UDF, can I do either below:
>>
>> 1. use a java UDF to completely replace the python Grouped Map pandas
>> UDF.
>> 2. The Python Grouped Map pandas UDF calls a java function internally.
>>
>> Which way is more promising and how? Thanks for any pointers.
>>
>> Thanks
>> Lian
>>
>>
>>
>>
>
> --
>
> Create your own email signature
> <https://www.wisestamp.com/signature-in-email/?utm_source=promotion_medium=signature_campaign=create_your_own=5234462839406592>
>


-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion_medium=signature_campaign=create_your_own=5234462839406592>


Re: use java in Grouped Map pandas udf to avoid serDe

2020-10-04 Thread Lian Jiang
Please ignore this question.
https://kontext.tech/column/spark/370/improve-pyspark-performance-using-pandas-udf-with-apache-arrow
shows pandas udf should have avoided jvm<->Python SerDe by maintaining one
data copy in memory. spark.sql.execution.arrow.enabled is false by default.
I think I missed enabling spark.sql.execution.arrow.enabled. Thanks.
Regards.

On Sun, Oct 4, 2020 at 10:22 AM Lian Jiang  wrote:

> Hi,
>
> I am using pyspark Grouped Map pandas UDF (
> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html).
> Functionality wise it works great. However, serDe causes a lot of perf
> hits. To optimize this UDF, can I do either below:
>
> 1. use a java UDF to completely replace the python Grouped Map pandas UDF.
> 2. The Python Grouped Map pandas UDF calls a java function internally.
>
> Which way is more promising and how? Thanks for any pointers.
>
> Thanks
> Lian
>
>
>
>

-- 

Create your own email signature
<https://www.wisestamp.com/signature-in-email/?utm_source=promotion_medium=signature_campaign=create_your_own=5234462839406592>


use java in Grouped Map pandas udf to avoid serDe

2020-10-04 Thread Lian Jiang
Hi,

I am using pyspark Grouped Map pandas UDF (
https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html).
Functionality wise it works great. However, serDe causes a lot of perf
hits. To optimize this UDF, can I do either below:

1. use a java UDF to completely replace the python Grouped Map pandas UDF.
2. The Python Grouped Map pandas UDF calls a java function internally.

Which way is more promising and how? Thanks for any pointers.

Thanks
Lian


Spark interrupts S3 request backoff

2020-04-12 Thread Lian Jiang
Hi,

My Spark job failed when reading parquet files from S3 due to 503 slow
down. According to
https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html,
I can use backoff to mitigate this issue. However, spark seems to interrupt
the backoff sleeping (see "sleep interrupted"). Is there a way (e.g. some
settings) to make spark not interrupt the backoff? Appreciate any hints.


20/04/12 20:15:37 WARN TaskSetManager: Lost task 3347.0 in stage 155.0
(TID 128138, ip-100-101-44-35.us-west-2.compute.internal, executor
34): org.apache.spark.sql.execution.datasources.FileDownloadException:
Failed to download file path:
s3://mybucket/myprefix/part-00178-d0a0d51f-f98e-4b9d-8d00-bb3b9acd9a47-c000.snappy.parquet,
range: 0-19231, partition values: [empty row], isDataPresent: false
at 
org.apache.spark.sql.execution.datasources.AsyncFileDownloader.next(AsyncFileDownloader.scala:142)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.getNextFile(FileScanRDD.scala:248)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:172)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
Slow Down (Service: Amazon S3; Status Code: 503; Error Code: 503 Slow
Down; Request ID: CECE220993AE7F89; S3 Extended Request ID:
UlQe4dEuBR1YWJUthSlrbV9phyqxUNHQEw7tsJ5zu+oNIH+nGlGHfAv7EKkQRUVP8tw8x918A4Y=),
S3 Extended Request ID:
UlQe4dEuBR1YWJUthSlrbV9phyqxUNHQEw7tsJ5zu+oNIH+nGlGHfAv7EKkQRUVP8tw8x918A4Y=
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1712)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1367)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926)
at 
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872)
at 

Re: pandas_udf is very slow

2020-04-05 Thread Lian Jiang
Thanks Silvio. I need grouped map pandas UDF which takes a spark data frame as 
the input and outputs a spark data frame having a different shape from input. 
Grouped map is kind of unique to pandas udf and I have trouble to find a 
similar non pandas udf for an apple to apple comparison. Let me know if you 
have better idea for investigating grouped map pandas udf slowness.

One potential work around could be grouping the 250M records by id. For each 
group, do groupby(‘id’).apply(pd_udf). Not sure which way is more promising 
compared with repartition + mapPartition, reduceByKey, combineByKey.

Appreciate any clue.

Sent from my iPhone

> On Apr 5, 2020, at 6:18 AM, Silvio Fiorito  
> wrote:
> 
> 
> Your 2 examples are doing different things.
>  
> The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an 
> aggregate.
>  
> I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your 
> result the same?
>  
> From: Lian Jiang 
> Date: Sunday, April 5, 2020 at 3:28 AM
> To: user 
> Subject: pandas_udf is very slow
>  
> Hi,
>  
> I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored 
> over non pandas udf per 
> https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.
>  
> My data has about 250M records and the pandas udf code is like:
>  
> def pd_udf_func(data):
> return pd.DataFrame(["id"])
> 
> pd_udf = pandas_udf(
> pd_udf_func,
> returnType=("id int"),
> functionType=PandasUDFType.GROUPED_MAP
> )
> df3 = df.groupBy("id").apply(pd_udf)
> df3.explain()
> """
> == Physical Plan ==
> FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
> +- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
>+- Exchange hashpartitioning(id#9L, 200)
>   +- *(1) Project [id#9L, id#9L, txt#10]
>  +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> As you can see, this pandas udf does nothing but returning a row having a 
> pandas dataframe having None values. In reality, this pandas udf has 
> complicated logic (e.g. iterate through the pandas dataframe rows and do some 
> calculation). This simplification is to reduce noise caused by application 
> specific logic. This pandas udf takes hours to run using 10 executors (14 
> cores and 64G mem each). On the other hand, below non-pandas udf can finish 
> in minutes:
>  
> def udf_func(data_list):
> return "hello"
> 
> udf = udf(udf_func, StringType())
> df2 = 
> df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
>  udf('txt1'))
> df2.explain()
> """
> == Physical Plan ==
> *(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
> +- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
>+- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0, 
> 0)])
>   +- Exchange hashpartitioning(id#9L, 200)
>  +- ObjectHashAggregate(keys=[id#9L], 
> functions=[partial_collect_list(txt#10, 0, 0)])
> +- Scan ExistingRDD[id#9L,txt#10]
> """
>  
> The physical plans show pandas udf uses sortAggregate (slower) while 
> non-pandas udf uses objectHashAggregate (faster).
>  
> Below is what I have tried to improve the performance of pandas udf but none 
> of them worked:
> 1. repartition before groupby. For example, df.repartition(140, 
> "id").groupBy("id").apply(pd_udf). 140 is the same as 
> spark.sql.shuffle.partitions. I hope groupby can benefit from the repartition 
> but according to the execution plan the repartition seems to be ignored since 
> groupby will do partitioning itself.
> 
> 
> 2. although this slowness is more likely caused by pandas udf instead of 
> groupby, I still played with shuffle settings such as 
> spark.shuffle.compress=True, spark.shuffle.spill.compress=True.
> 
> 
> 3. I played with serDe settings such as 
> spark.serializer=org.apache.spark.serializer.KryoSerializer. Also I tried 
> pyarrow settings such as spark.sql.execution.arrow.enabled=True and 
> spark.sql.execution.arrow.maxRecordsPerBatch=10
> 
> 
> 4. I tried to replace the solution of "groupby + pandas udf " with 
> combineByKey, reduceByKey, repartition + mapPartition. But it is not easy 
> since the pandas udf has complicated logic.
> 
>  
> My questions:
>  
> 1. why pandas udf is so slow?
> 2. is there a way to improve the performance of pandas_udf?
> 3. in case it is a known issue of pandas udf, what other remedy I can use? I 
> guess I need to think harder on combineByKey, reduceByKey, repartition + 
> mapPartition. But want to know if I missed anything obvious.
>  
> Any clue is highly appreciated.
>  
> Thanks
> Leon
>  
>  
>  
>  


pandas_udf is very slow

2020-04-05 Thread Lian Jiang
Hi,

I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored
over non pandas udf per
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.


My data has about 250M records and the pandas udf code is like:

def pd_udf_func(data):
return pd.DataFrame(["id"])

pd_udf = pandas_udf(
pd_udf_func,
returnType=("id int"),
functionType=PandasUDFType.GROUPED_MAP
)
df3 = df.groupBy("id").apply(pd_udf)
df3.explain()
"""
== Physical Plan ==
FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
+- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#9L, 200)
  +- *(1) Project [id#9L, id#9L, txt#10]
 +- Scan ExistingRDD[id#9L,txt#10]
"""

As you can see, this pandas udf does nothing but returning a row having a
pandas dataframe having None values. In reality, this pandas udf has
complicated logic (e.g. iterate through the pandas dataframe rows and do
some calculation). This simplification is to reduce noise caused by
application specific logic. This pandas udf takes hours to run using 10
executors (14 cores and 64G mem each). On the other hand, below non-pandas
udf can finish in minutes:

def udf_func(data_list):
return "hello"

udf = udf(udf_func, StringType())
df2 =
df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd',
udf('txt1'))
df2.explain()
"""
== Physical Plan ==
*(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
+- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
   +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0,
0)])
  +- Exchange hashpartitioning(id#9L, 200)
 +- ObjectHashAggregate(keys=[id#9L],
functions=[partial_collect_list(txt#10, 0, 0)])
+- Scan ExistingRDD[id#9L,txt#10]
"""

The physical plans show pandas udf uses sortAggregate (slower) while
non-pandas udf uses objectHashAggregate (faster).

Below is what I have tried to improve the performance of pandas udf but
none of them worked:
1. repartition before groupby. For example, df.repartition(140,
"id").groupBy("id").apply(pd_udf). 140 is the same as
spark.sql.shuffle.partitions.
I hope groupby can benefit from the repartition but according to the
execution plan the repartition seems to be ignored since groupby will do
partitioning itself.

2. although this slowness is more likely caused by pandas udf instead of
groupby, I still played with shuffle settings such as
spark.shuffle.compress=True,
spark.shuffle.spill.compress=True.

3. I played with serDe settings such as
spark.serializer=org.apache.spark.serializer.KryoSerializer.
Also I tried pyarrow settings such as spark.sql.execution.arrow.enabled=True
and spark.sql.execution.arrow.maxRecordsPerBatch=10

4. I tried to replace the solution of "groupby + pandas udf " with
combineByKey, reduceByKey, repartition + mapPartition. But it is not easy
since the pandas udf has complicated logic.

My questions:

1. why pandas udf is so slow?
2. is there a way to improve the performance of pandas_udf?
3. in case it is a known issue of pandas udf, what other remedy I can use?
I guess I need to think harder on combineByKey, reduceByKey, repartition +
mapPartition. But want to know if I missed anything obvious.

Any clue is highly appreciated.

Thanks
Leon


Re: pandas_udf throws "Unsupported class file major version 56"

2019-10-07 Thread Lian Jiang
I figured out. Thanks.

On Mon, Oct 7, 2019 at 9:55 AM Lian Jiang  wrote:

> Hi,
>
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> import pyspark
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
>
> df = spark.createDataFrame(
> [(1, True, 1.0, 'aa'), (1, False, 2.0, 'aa'), (2, True, 3.0, 'aa'),
> (2, True, 5.0, 'aa'), (2, True, 10.0, 'aa')],
> ("id", "is_deleted", "v", "a"))
>
>
> @pandas_udf("id long, is_deleted boolean, v double, a string",
> PandasUDFType.GROUPED_MAP)
> def subtract_mean(pdf):
> v = pdf.v
> return pdf.assign(v=v - v.mean())
>
> df.groupby("id").apply(subtract_mean).show()  > works fine.
>
>
>
> @pandas_udf('double', PandasUDFType.SCALAR)
> def pandas_plus_one(v):
> return v + 1
>
> df.withColumn('v2', pandas_plus_one(df.v)).show()  --->* throw error*
>
>
> Error:
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/Users/lianj/repo/historical-data-storage-clean/lib/python2.7/site-packages/pyspark/sql/dataframe.py",
> line 380, in show
> print(self._jdf.showString(n, 20, vertical))
>   File
> "/Users/lianj/repo/historical-data-storage-clean/lib/python2.7/site-packages/py4j/java_gateway.py",
> line 1257, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File
> "/Users/lianj/repo/historical-data-storage-clean/lib/python2.7/site-packages/pyspark/sql/utils.py",
> line 79, in deco
> raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
> IllegalArgumentException: u'Unsupported class file major version 56'
>
>
> Any idea? Appreciate any help!
>
>
>
>
> https://stackoverflow.com/questions/53583199/pyspark-error-unsupported-class-file-major-version-55
> does not help since I have already been using java 1.8.
>
> I am using:
> pyspark 2.4.4
> pandas 0.23.4
> numpy 1.14.5
> pyarrow 0.10.0
> Note that I got numpy, pyarrow, pandas versions from
> https://stackoverflow.com/questions/51713705/python-pandas-udf-spark-error
> which help to make UDAF subtract_mean work.
>
>
>
>
>
>
>
>
>


pandas_udf throws "Unsupported class file major version 56"

2019-10-07 Thread Lian Jiang
Hi,

from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
[(1, True, 1.0, 'aa'), (1, False, 2.0, 'aa'), (2, True, 3.0, 'aa'), (2,
True, 5.0, 'aa'), (2, True, 10.0, 'aa')],
("id", "is_deleted", "v", "a"))


@pandas_udf("id long, is_deleted boolean, v double, a string",
PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()  > works fine.



@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_plus_one(v):
return v + 1

df.withColumn('v2', pandas_plus_one(df.v)).show()  --->* throw error*


Error:
Traceback (most recent call last):
  File "", line 1, in 
  File
"/Users/lianj/repo/historical-data-storage-clean/lib/python2.7/site-packages/pyspark/sql/dataframe.py",
line 380, in show
print(self._jdf.showString(n, 20, vertical))
  File
"/Users/lianj/repo/historical-data-storage-clean/lib/python2.7/site-packages/py4j/java_gateway.py",
line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File
"/Users/lianj/repo/historical-data-storage-clean/lib/python2.7/site-packages/pyspark/sql/utils.py",
line 79, in deco
raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
IllegalArgumentException: u'Unsupported class file major version 56'


Any idea? Appreciate any help!



https://stackoverflow.com/questions/53583199/pyspark-error-unsupported-class-file-major-version-55
does not help since I have already been using java 1.8.

I am using:
pyspark 2.4.4
pandas 0.23.4
numpy 1.14.5
pyarrow 0.10.0
Note that I got numpy, pyarrow, pandas versions from
https://stackoverflow.com/questions/51713705/python-pandas-udf-spark-error
which help to make UDAF subtract_mean work.


Re: writing into oracle database is very slow

2019-04-19 Thread Lian Jiang
Thanks for interesting ideas! Looks like spark directly writing to relational 
database is not as straight forward as I expected.

Sent from my iPhone

> On Apr 19, 2019, at 06:58, Khare, Ankit  wrote:
> 
> Hi Jiang
> 
> We faced similar issue so we write the file and then use sqoop to export data 
> to mssql. 
> 
> We achieved a great time benefit with this strategy.
> 
> Sent from my iPhone
> 
> On 19. Apr 2019, at 10:47, spark receiver  wrote:
> 
>> hi Jiang,
>> 
>> i was facing the very same issue ,the solution is write to file and using 
>> oracle external table to do the insert.
>> 
>> hope this could help.
>> 
>> Dalin
>> 
>>> On Thu, Apr 18, 2019 at 11:43 AM Jörn Franke  wrote:
>>> What is the size of the data? How much time does it need on HDFS and how 
>>> much on Oracle? How many partitions do you have on Oracle side?
>>> 
>>> Am 06.04.2019 um 16:59 schrieb Lian Jiang :
>>> 
>>>> Hi,
>>>> 
>>>> My spark job writes into oracle db using:
>>>> df.coalesce(10).write.format("jdbc").option("url", url)
>>>>   .option("driver", driver).option("user", user)
>>>>   .option("batchsize", 2000)
>>>>   .option("password", password).option("dbtable", 
>>>> tableName).mode("append").save()
>>>> It is much slow than writting into HDFS. The data to write is small.
>>>> Is this expected? Thanks for any clue.
>>>> 


writing into oracle database is very slow

2019-04-06 Thread Lian Jiang
Hi,

My spark job writes into oracle db using:

df.coalesce(10).write.format("jdbc").option("url", url)
  .option("driver", driver).option("user", user)
  .option("batchsize", 2000)
  .option("password", password).option("dbtable",
tableName).mode("append").save()

It is much slow than writting into HDFS. The data to write is small.

Is this expected? Thanks for any clue.


spark generates corrupted parquet files

2019-03-29 Thread Lian Jiang
Hi,

Occasionally, spark generates some parquet files having only 4 bytes. The
content is "PAR1". ETL spark jobs cannot handle such corrupted files and
ignore the whole partition containing such poison pill files, causing big
data loss.

Spark also generates 0 bytes parquet files but they can be handled by spark.

What could be cause for spark to generate such 4 bytes files? Any clue is
appreciated!


Re: writing a small csv to HDFS is super slow

2019-03-26 Thread Lian Jiang
Hi Gezim,

My execution plan of the data frame to write into HDFS is a union of 140
children dataframes. All these children data frames are not materialized
when writing to HDFS. It is not saving file taking time. Instead, it is
materializing the dataframes taking time. My solution is to materialize all
the children dataframe and save into HDFS. Then union the pre-existing
children dataframes and saving to HDFS is very fast.

Hope this helps!

On Tue, Mar 26, 2019 at 1:50 PM Gezim Sejdiu  wrote:

> Hi Lian,
>
> I was following the thread since one of my students had the same issue.
> The problem was when trying to save a larger XML dataset into HDFS and due
> to the connectivity timeout between Spark and HDFS, the output wasn't able
> to be displayed.
> I also suggested him to do the same as @Apostolos said in the previous
> mail, using saveAsTextFile instead (haven't got any result/reply after my
> suggestion).
>
> Seeing the last commit date "*Jan 10, 2017*" made
> on databricks/spark-csv [1] project, not sure how much inline with Spark
> 2.x is. Even though there is a *note* about it on the README file.
>
> Would it be possible that you share your solution (in case the project is
> open-sourced already) with us and then we can have a look at it?
>
> Many thanks in advance.
>
> Best regards,
> [1]. https://github.com/databricks/spark-csv
>
> On Tue, Mar 26, 2019 at 1:09 AM Lian Jiang  wrote:
>
>> Thanks guys for reply.
>>
>> The execution plan shows a giant query. After divide and conquer, saving
>> is quick.
>>
>> On Fri, Mar 22, 2019 at 4:01 PM kathy Harayama 
>> wrote:
>>
>>> Hi Lian,
>>> Since you using repartition(1), do you want to decrease the number of
>>> partitions? If so, have you tried to use coalesce instead?
>>>
>>> Kathleen
>>>
>>> On Fri, Mar 22, 2019 at 2:43 PM Lian Jiang 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Writing a csv to HDFS takes about 1 hour:
>>>>
>>>>
>>>> df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)
>>>>
>>>> The generated csv file is only about 150kb. The job uses 3 containers
>>>> (13 cores, 23g mem).
>>>>
>>>> Other people have similar issues but I don't see a good explanation and
>>>> solution.
>>>>
>>>> Any clue is highly appreciated! Thanks.
>>>>
>>>>
>>>>
>
> --
>
> _
>
> *Gëzim Sejdiu*
>
>
>
> *PhD Student & Research Associate*
>
> *SDA, University of Bonn*
>
> *Endenicher Allee 19a, 53115 Bonn, Germany*
>
> *https://gezimsejdiu.github.io/ <https://gezimsejdiu.github.io/>*
>
> GitHub <https://github.com/GezimSejdiu> | Twitter
> <https://twitter.com/Gezim_Sejdiu> | LinkedIn
> <https://www.linkedin.com/in/g%C3%ABzim-sejdiu-08b1761b> | Google Scholar
> <https://scholar.google.de/citations?user=Lpbwr9oJ>
>
>


Re: writing a small csv to HDFS is super slow

2019-03-25 Thread Lian Jiang
Thanks guys for reply.

The execution plan shows a giant query. After divide and conquer, saving is
quick.

On Fri, Mar 22, 2019 at 4:01 PM kathy Harayama 
wrote:

> Hi Lian,
> Since you using repartition(1), do you want to decrease the number of
> partitions? If so, have you tried to use coalesce instead?
>
> Kathleen
>
> On Fri, Mar 22, 2019 at 2:43 PM Lian Jiang  wrote:
>
>> Hi,
>>
>> Writing a csv to HDFS takes about 1 hour:
>>
>>
>> df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)
>>
>> The generated csv file is only about 150kb. The job uses 3 containers (13
>> cores, 23g mem).
>>
>> Other people have similar issues but I don't see a good explanation and
>> solution.
>>
>> Any clue is highly appreciated! Thanks.
>>
>>
>>


writing a small csv to HDFS is super slow

2019-03-22 Thread Lian Jiang
Hi,

Writing a csv to HDFS takes about 1 hour:

df.repartition(1).write.format('com.databricks.spark.csv').mode('overwrite').options(header='true').save(csv)

The generated csv file is only about 150kb. The job uses 3 containers (13
cores, 23g mem).

Other people have similar issues but I don't see a good explanation and
solution.

Any clue is highly appreciated! Thanks.


read json and write into parquet in executors

2019-03-11 Thread Lian Jiang
Hi,

In my spark batch job,

step 1: the driver assigns a partition of json file path list to each
executor.
step 2: each executor gets these assigned json files from S3 and save into
hdfs.
step 3: the driver read these json files into a data frame and save into
parquet.

To improve performance by avoiding writing jsons to hdfs, I want to change
the workflow to:

step 1: the driver assigns a partition of json file path list to each
executor.
step 2: each executor gets these assigned json files from S3, merge the
json content in memory and directly write to parquet. No need to write
jsons to hdfs.

I cannot create dataframes in executors. Is this improvement feasible?
Appreciate any help!


Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Lian Jiang
Thanks guys!

I am using SSS to backfill the past 3 month data. I thought I can use SSS
for both history data and new data. I just realized that SSS is not
appropriate for backfilling since the watermark relies on receivedAt which
could be 3 month ago. I will use batch job for backfill and use SSS (with
watermark and spark-states) for the real time processing.

On Sun, Mar 10, 2019 at 2:40 PM Jungtaek Lim  wrote:

> The query makes state growing infinitely. Could you consider watermark
> apply to "receivedAt" to let unnecessary part of state cleared out? Other
> than watermark you could implement TTL based eviction via
> flatMapGroupsWithState, though you'll need to implement your custom
> "dropDuplicate".
>
> 2019년 3월 11일 (월) 오전 5:59, Georg Heiler 님이 작성:
>
>> Use https://github.com/chermenin/spark-states instead
>>
>> Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan <
>> ar...@apache.org>:
>>
>>>
>>> Read the link carefully,
>>>
>>> This solution is available (*only*) in Databricks Runtime.
>>>
>>> You can enable RockDB-based state management by setting the following
>>> configuration in the SparkSession before starting the streaming query.
>>>
>>> spark.conf.set(
>>>   "spark.sql.streaming.stateStore.providerClass",
>>>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>>>
>>>
>>> On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a very simple SSS pipeline which does:
>>>>
>>>> val query = df
>>>>   .dropDuplicates(Array("Id", "receivedAt"))
>>>>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>>>>   .writeStream
>>>>   .format("parquet")
>>>>   .partitionBy("availabilityDomain", timePartitionCol)
>>>>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>>>>   .option("path", "/data")
>>>>   .option("checkpointLocation", "/data_checkpoint")
>>>>   .start()
>>>>
>>>> After ingesting 2T records, the state under checkpoint folder on HDFS 
>>>> (replicator factor 2) grows to 2T bytes.
>>>> My cluster has only 2T bytes which means the cluster can barely handle 
>>>> further data growth.
>>>>
>>>> Online spark documents 
>>>> (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
>>>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot 
>>>> find any document how
>>>>
>>>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only 
>>>> handle HDFS.
>>>>
>>>> Any suggestions? Thanks!
>>>>
>>>>
>>>>
>>>>


use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Lian Jiang
Hi,

I have a very simple SSS pipeline which does:

val query = df
  .dropDuplicates(Array("Id", "receivedAt"))
  .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
  .writeStream
  .format("parquet")
  .partitionBy("availabilityDomain", timePartitionCol)
  .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
  .option("path", "/data")
  .option("checkpointLocation", "/data_checkpoint")
  .start()

After ingesting 2T records, the state under checkpoint folder on HDFS
(replicator factor 2) grows to 2T bytes.
My cluster has only 2T bytes which means the cluster can barely handle
further data growth.

Online spark documents
(https://docs.databricks.com/spark/latest/structured-streaming/production.html)
says using rocksdb help SSS job reduce JVM memory overhead. But I
cannot find any document how

to setup rocksdb for SSS. Spark class CheckpointReader seems to only
handle HDFS.

Any suggestions? Thanks!


Re: spark structured streaming crash due to decompressing gzip file failure

2019-03-07 Thread Lian Jiang
Thanks, it worked.

On Thu, Mar 7, 2019 at 5:05 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi,
>
> In your spark-submit command, try using the below config property and see
> if this solves the problem.
>
> --conf spark.sql.files.ignoreCorruptFiles=true
>
> For me this worked to ignore reading empty/partially uploaded gzip files
> in s3 bucket.
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang  wrote:
>
>> Hi,
>>
>> I have a structured streaming job which listens to a hdfs folder
>> containing jsonl.gz files. The job crashed due to error:
>>
>> java.io.IOException: incorrect header check
>> at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
>> Method)
>> at
>> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
>> at
>> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
>> at
>> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
>> at java.io.InputStream.read(InputStream.java:101)
>> at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
>> at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
>> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
>> at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
>> at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
>> at
>> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>> at
>> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>> at org.apache.spark.scheduler.Task.run(Task.scala:109)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Is there a way to skip the gz files that cannot be decompressed?
>> Exception handling seems not help. The only workaround I can think of is to
>> decompress the gz files into another folder first and make the spark
>> streaming job listen to this new folder. But this workaround may not be
>> better compared with the solution using a unstructured streaming job to
>> directly decompress the gz file, read jsonl file, validate the records and
>> write the validated records into parquet.
>>
>> Any idea is highly appreciated!
>>
>>
>>
>>
>>


spark structured streaming crash due to decompressing gzip file failure

2019-03-06 Thread Lian Jiang
Hi,

I have a structured streaming job which listens to a hdfs folder containing
jsonl.gz files. The job crashed due to error:

java.io.IOException: incorrect header check
at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
Method)
at
org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
at
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Is there a way to skip the gz files that cannot be decompressed? Exception
handling seems not help. The only workaround I can think of is to
decompress the gz files into another folder first and make the spark
streaming job listen to this new folder. But this workaround may not be
better compared with the solution using a unstructured streaming job to
directly decompress the gz file, read jsonl file, validate the records and
write the validated records into parquet.

Any idea is highly appreciated!


spark structured streaming handles pre-existing files

2019-02-14 Thread Lian Jiang
Hi,

We have a spark structured streaming monitoring a folder and converting
jsonl files into parquet. However, if there are some pre-existing jsonl
files before the first time (no check point yet) running of the spark
streaming job, these files will not be processed by the spark job when it
runs. We need to do something like
https://stackoverflow.com/questions/44618783/spark-streaming-only-streams-files-created-after-the-stream-initialization-time
.

Is there a way for the spark streaming job to pick up the pre-existing
files? For example, is there a setting for this? Appreciate any clue.


structured streaming handling validation and json flattening

2019-02-09 Thread Lian Jiang
Hi,

We have a structured streaming job that converting json into parquets. We
want to validate the json records. If a json record is not valid, we want
to log a message and refuse to write it into the parquet. Also the json has
nesting jsons and we want to flatten the nesting jsons into other parquets
by using the same streaming job. My questions are:

1. how to validate the json records in a structured streaming job?
2. how to flattening the nesting jsons in a structured streaming job?
3. is it possible to use one structured streaming job to validate json,
convert json into a parquet and convert nesting jsons into other parquets?

I think unstructured streaming can achieve these goals but structured
streaming is recommended by spark community.

Appreciate your feedback!


spark in jupyter cannot find a class in a jar

2018-11-15 Thread Lian Jiang
I am using spark in Jupyter as below:

import findspark
findspark.init()

from pyspark import SQLContext, SparkContext
sqlCtx = SQLContext(sc)
df = sqlCtx.read.parquet("oci://mybucket@mytenant/myfile.parquet")

The error is:

Py4JJavaError: An error occurred while calling o198.parquet.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem
for scheme "oci"

I have put oci-hdfs-full-2.7.2.0.jar defining oci filesystem on all
namenodes and datanodes on hadoop.

export PYSPARK_SUBMIT_ARGS="--master yarn --deploy-mode client
pyspark-shell --driver-cores 8 --driver-memory 20g --num-executors 2
--executor-cores 6  --executor-memory 30g --jars
/mnt/data/hdfs/oci-hdfs-full-2.7.2.0.jar --conf
spark.executor.extraClassPath=/mnt/data/hdfs/oci-hdfs-full-2.7.2.0.jar
--conf spark.driver.extraClassPath=/mnt/data/hdfs/oci-hdfs-full-2.7.2.0.jar"

Any idea why this still happens? Thanks for any clue.


Re: Spark Structured Streaming handles compressed files

2018-11-02 Thread Lian Jiang
Any clue? Thanks.

On Wed, Oct 31, 2018 at 8:29 PM Lian Jiang  wrote:

> We have jsonl files each of which is compressed as gz file. Is it possible
> to make SSS to handle such files? Appreciate any help!
>


Spark Structured Streaming handles compressed files

2018-10-31 Thread Lian Jiang
We have jsonl files each of which is compressed as gz file. Is it possible
to make SSS to handle such files? Appreciate any help!


Re: spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
I solved this issue by using spark 2.3.1 jars copied from the HDP3.0
cluster. Thanks.

On Thu, Aug 30, 2018 at 10:18 AM Lian Jiang  wrote:

> Per https://spark.apache.org/docs/latest/building-spark.html, spark 2.3.1
> is built with hadoop 2.6.X by default. This is why I see my fat jar
> includes hadoop 2.6.5 (instead of 3.1.0) jars. HftpFileSystem has been
> removed in hadoop 3.
>
> On https://spark.apache.org/downloads.html, I only see spark 2.3.1 built
> with hadoop 2.7. Where can I get spark 2.3.1 built with hadoop 3? Does
> spark 2.3.1 support hadoop 3?
>
> Appreciate your help.
>
> On Thu, Aug 30, 2018 at 8:59 AM Lian Jiang  wrote:
>
>> Hi,
>>
>> I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
>> streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in
>> HDP3:
>>
>> java.lang.IllegalAccessError: class
>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>>
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>>
>> at
>> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:85)
>>
>> at
>> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.(HadoopFileLinesReader.scala:46)
>>
>> at
>> org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.readFile(JsonDataSource.scala:125)
>>
>> at
>> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:132)
>>
>> at
>> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:130)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
>>
>> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org
>> $apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
>>
>> at
>> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>>
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>> Source)
>>
>> at
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>>
>> at
>> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
>>
>&g

Re: spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
Per https://spark.apache.org/docs/latest/building-spark.html, spark 2.3.1
is built with hadoop 2.6.X by default. This is why I see my fat jar
includes hadoop 2.6.5 (instead of 3.1.0) jars. HftpFileSystem has been
removed in hadoop 3.

On https://spark.apache.org/downloads.html, I only see spark 2.3.1 built
with hadoop 2.7. Where can I get spark 2.3.1 built with hadoop 3? Does
spark 2.3.1 support hadoop 3?

Appreciate your help.

On Thu, Aug 30, 2018 at 8:59 AM Lian Jiang  wrote:

> Hi,
>
> I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
> streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in
> HDP3:
>
> java.lang.IllegalAccessError: class
> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
>
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
>
> at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:85)
>
> at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.(HadoopFileLinesReader.scala:46)
>
> at
> org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.readFile(JsonDataSource.scala:125)
>
> at
> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:132)
>
> at
> org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:130)
>
> at
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)
>
> at
> org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)
>
> at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org
> $apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)
>
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
>
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
>
> at
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
>
> at
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>

spark structured streaming jobs working in HDP2.6 fail in HDP3.0

2018-08-30 Thread Lian Jiang
Hi,

I am using HDP3.0 which uses HADOOP3.1.0 and Spark 2.3.1. My spark
streaming jobs running fine in HDP2.6.4 (HADOOP2.7.3, spark 2.2.0) fails in
HDP3:

java.lang.IllegalAccessError: class
org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:763)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)

at java.net.URLClassLoader.access$100(URLClassLoader.java:73)

at java.net.URLClassLoader$1.run(URLClassLoader.java:368)

at java.net.URLClassLoader$1.run(URLClassLoader.java:362)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:361)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)

at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)

at java.util.ServiceLoader$1.next(ServiceLoader.java:480)

at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)

at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:85)

at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.(HadoopFileLinesReader.scala:46)

at
org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.readFile(JsonDataSource.scala:125)

at
org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:132)

at
org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anonfun$buildReader$2.apply(JsonFileFormat.scala:130)

at
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:148)

at
org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:132)

at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org
$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:128)

at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182)

at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)

at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)

at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)

at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)

at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)

at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)

at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)



Any idea? Thanks.


load hbase data using spark

2018-06-18 Thread Lian Jiang
Hi,

I am considering tools to load hbase data using spark. One choice is
https://github.com/Huawei-Spark/Spark-SQL-on-HBase. However, this seems to
be out-of-date (e.g. "This version of 1.0.0 requires Spark 1.4.0."). Which
tool should I use for this purpose? Thanks for any hint.


Re: schema change for structured spark streaming using jsonl files

2018-04-24 Thread Lian Jiang
Thanks for any help!

On Mon, Apr 23, 2018 at 11:46 AM, Lian Jiang <jiangok2...@gmail.com> wrote:

> Hi,
>
> I am using structured spark streaming which reads jsonl files and writes
> into parquet files. I am wondering what's the process if jsonl files schema
> change.
>
> Suppose jsonl files are generated in \jsonl folder and the old schema is {
> "field1": String}. My proposal is:
>
> 1. write the jsonl files with new schema (e.g. {"field1":String,
> "field2":Int}) into another folder \jsonl2
> 2. let spark job complete handling all data in \jsonl, then stop the spark
> streaming job.
> 3. use a spark script to convert the parquet files from old schema to new
> schema (e.g. add a new column with some default value for "field2").
> 4. upgrade and start the spark streaming job for handling the new schema
> jsonl files and parquet files.
>
> Is this process correct (best)? Thanks for any clue.
>


schema change for structured spark streaming using jsonl files

2018-04-23 Thread Lian Jiang
Hi,

I am using structured spark streaming which reads jsonl files and writes
into parquet files. I am wondering what's the process if jsonl files schema
change.

Suppose jsonl files are generated in \jsonl folder and the old schema is {
"field1": String}. My proposal is:

1. write the jsonl files with new schema (e.g. {"field1":String,
"field2":Int}) into another folder \jsonl2
2. let spark job complete handling all data in \jsonl, then stop the spark
streaming job.
3. use a spark script to convert the parquet files from old schema to new
schema (e.g. add a new column with some default value for "field2").
4. upgrade and start the spark streaming job for handling the new schema
jsonl files and parquet files.

Is this process correct (best)? Thanks for any clue.


spark hbase connector

2018-04-17 Thread Lian Jiang
Hi,

My spark jobs need to talk to hbase and I am not sure which spark hbase
connector is recommended:

https://hortonworks.com/blog/spark-hbase-dataframe-based-hbase-connector/

https://phoenix.apache.org/phoenix_spark.html

Or there is any other better solutions. Appreciate any guidance.


avoid duplicate records when appending new data to a parquet

2018-04-13 Thread Lian Jiang
I have a parquet which has an id field which is the hash of the composite
key fields. Is it possible to maintain the uniqueness of the id field when
appending new data which may duplicate with existing records in the
parquet? Thanks!


Re: retention policy for spark structured streaming dataset

2018-03-14 Thread Lian Jiang
It is already partitioned by timestamp. But is it right retention policy
process to stop the streaming job, trim the parquet file and restart the
streaming job? Thanks.

On Wed, Mar 14, 2018 at 12:51 PM, Sunil Parmar <sunilosu...@gmail.com>
wrote:

> Can you use partitioning ( by day ) ? That will  make it easier to drop
> data older than x days outside streaming job.
>
> Sunil Parmar
>
> On Wed, Mar 14, 2018 at 11:36 AM, Lian Jiang <jiangok2...@gmail.com>
> wrote:
>
>> I have a spark structured streaming job which dump data into a parquet
>> file. To avoid the parquet file grows infinitely, I want to discard 3 month
>> old data. Does spark streaming supports this? Or I need to stop the
>> streaming job, trim the parquet file and restart the streaming job? Thanks
>> for any hints.
>>
>
>


retention policy for spark structured streaming dataset

2018-03-14 Thread Lian Jiang
I have a spark structured streaming job which dump data into a parquet
file. To avoid the parquet file grows infinitely, I want to discard 3 month
old data. Does spark streaming supports this? Or I need to stop the
streaming job, trim the parquet file and restart the streaming job? Thanks
for any hints.


Re: dependencies conflict in oozie spark action for spark 2

2018-03-07 Thread Lian Jiang
I found below inconsistency between oozie and spark2 jars:

jackson-core-2.4.4.jar oozie
jackson-core-2.6.5.jar spark2

jackson-databind-2.4.4.jar oozie
jackson-databind-2.6.5.jar spark2

jackson-annotations-2.4.0.jar oozie
jackson-annotations-2.6.5.jar spark2

I removed the lower version jars from oozie. Then spark job cannot
communicate to Yarn due to this error:

18/03/07 18:24:30 INFO Utils: Using initial executors = 0, max of
spark.dynamicAllocation.initialExecutors,
spark.dynamicAllocation.minExecutors and spark.executor.instances
Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.hadoop.yarn.proto.YarnProtos$ResourceProto$Builder.setMemory(J)Lorg/apache/hadoop/yarn/proto/YarnProtos$ResourceProto$Builder;
at
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.setMemorySize(ResourcePBImpl.java:78)
at
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.setMemory(ResourcePBImpl.java:72)
at
org.apache.hadoop.yarn.api.records.Resource.newInstance(Resource.java:58)
at
org.apache.spark.deploy.yarn.YarnAllocator.(YarnAllocator.scala:140)
at
org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
at
org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:387)
at
org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:430)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:282)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:768)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:766)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)


Any idea?


On Tue, Mar 6, 2018 at 4:17 PM, Lian Jiang <jiangok2...@gmail.com> wrote:

> I am using HDP 2.6.4 and have followed https://docs.hortonworks.com/
> HDPDocuments/HDP2/HDP-2.6.1/bk_spark-component-guide/
> content/ch_oozie-spark-action.html
> to make oozie use spark2.
>
> After this, I found there are still a bunch of issues:
>
> 1. oozie and spark tries to add the same jars multiple time into cache.
> This is resolved by removing the duplicate jars from 
> /user/oozie/share/lib/lib_20180303065325/spark2/
> folder.
>
> 2. jar conflict which is not resolved. The exception is below:
>
> 18/03/06 23:51:18 ERROR ApplicationMaster: User class threw exception:
> java.lang.NoSuchFieldError: USE_DEFAULTS
> java.lang.NoSuchFieldError: USE_DEFAULTS
> at com.fasterxml.jackson.databind.introspect.
> JacksonAnnotationIntrospector.findSerializationInclusion(
> JacksonAnnotationIntrospector.java:498)
> at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.
> findSerializationInclusion(AnnotationIntrospectorPair.java:332)
> at com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.
> findSerializationInclusion(AnnotationIntrospectorPair.java:332)
> at com.fasterxml.jackson.databind.introspect.BasicBeanDescription.
> findSerializationInclusion(BasicBeanDescription.java:381)
> at com.fasterxml.jackson.databind.ser.PropertyBuilder.<
> init>(PropertyBuilder.java:41)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> constructPropertyBuilder(BeanSerializerFactory.java:507)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> findBeanProperties(BeanSerializerFactory.java:558)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> constructBeanSerializer(BeanSerializerFactory.java:361)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> findBeanSerializer(BeanSerializerFactory.java:272)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory._
> createSerializer2(BeanSerializerFactory.java:225)
> at com.fasterxml.jackson.databind.ser.BeanSerializerFactory.
> createSerializer(BeanSerializerFactory.java:153)
> at com.fasterxml.jackson.databind.SerializerProvider._
> createUntypedSerializer(SerializerProvider.java:1203)
> at com.fasterxml.jackson.databind.SerializerProvider._
> createAndCacheUntypedSerializer(SerializerProvider.java:1157)
> at com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(
> SerializerProvider.java:481)
> at com.fasterxml.jackson.databind.SerializerProvider.
> findTypedValueSerializer(SerializerProvider.java:679)
> at com.fasterxml.jackson.databind.ser.Defaul

dependencies conflict in oozie spark action for spark 2

2018-03-06 Thread Lian Jiang
I am using HDP 2.6.4 and have followed
https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.1/bk_spark-component-guide/content/ch_oozie-spark-action.html
to make oozie use spark2.

After this, I found there are still a bunch of issues:

1. oozie and spark tries to add the same jars multiple time into cache.
This is resolved by removing the duplicate jars
from /user/oozie/share/lib/lib_20180303065325/spark2/ folder.

2. jar conflict which is not resolved. The exception is below:

18/03/06 23:51:18 ERROR ApplicationMaster: User class threw exception:
java.lang.NoSuchFieldError: USE_DEFAULTS
java.lang.NoSuchFieldError: USE_DEFAULTS
at
com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.findSerializationInclusion(JacksonAnnotationIntrospector.java:498)
at
com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findSerializationInclusion(AnnotationIntrospectorPair.java:332)
at
com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair.findSerializationInclusion(AnnotationIntrospectorPair.java:332)
at
com.fasterxml.jackson.databind.introspect.BasicBeanDescription.findSerializationInclusion(BasicBeanDescription.java:381)
at
com.fasterxml.jackson.databind.ser.PropertyBuilder.(PropertyBuilder.java:41)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.constructPropertyBuilder(BeanSerializerFactory.java:507)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.findBeanProperties(BeanSerializerFactory.java:558)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.constructBeanSerializer(BeanSerializerFactory.java:361)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.findBeanSerializer(BeanSerializerFactory.java:272)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory._createSerializer2(BeanSerializerFactory.java:225)
at
com.fasterxml.jackson.databind.ser.BeanSerializerFactory.createSerializer(BeanSerializerFactory.java:153)
at
com.fasterxml.jackson.databind.SerializerProvider._createUntypedSerializer(SerializerProvider.java:1203)
at
com.fasterxml.jackson.databind.SerializerProvider._createAndCacheUntypedSerializer(SerializerProvider.java:1157)
at
com.fasterxml.jackson.databind.SerializerProvider.findValueSerializer(SerializerProvider.java:481)
at
com.fasterxml.jackson.databind.SerializerProvider.findTypedValueSerializer(SerializerProvider.java:679)
at
com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:107)
at
com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3559)
at
com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2927)
at org.apache.spark.rdd.RDDOperationScope.toJson(RDDOperationScope.scala:52)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:145)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)



My dependencies are:

libraryDependencies += "com.typesafe.scala-logging" %%
"scala-logging-api" % "2.1.2"
libraryDependencies += "com.typesafe.scala-logging" %%
"scala-logging-slf4j" % "2.1.2"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
libraryDependencies += "com.typesafe" % "config" % "1.3.2"
libraryDependencies += "org.scalactic" %% "scalactic" % "3.0.4"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
libraryDependencies += "org.scalamock" %% "scalamock" % "4.1.0" % "test"
libraryDependencies += "com.jsuereth" %% "scala-arm" % "2.0"
libraryDependencies += "com.github.scopt" %% "scopt" % "3.7.0"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.3.8"
libraryDependencies += "io.dropwizard.metrics" % "metrics-core" % "4.0.2"
libraryDependencies += "com.typesafe.slick" %% "slick" % "3.2.1"
libraryDependencies += "com.typesafe.slick" %% "slick-hikaricp" % "3.2.1"
libraryDependencies += "com.typesafe.slick" %% "slick-extensions" % "3.0.0"
libraryDependencies += "org.scalaz" %% "scalaz-core" % "7.2.19"
libraryDependencies += "org.json4s" %% "json4s-native" % "3.5.3"
libraryDependencies += "com.softwaremill.retry" %% "retry" % "0.3.0"
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.5"
libraryDependencies += "org.apache.httpcomponents" % "httpcore" % "4.4.9"


Sbt dependency tree shows jackson 2.6.5 coming from spark-core is in use.
But per
https://stackoverflow.com/questions/36982173/java-lang-nosuchfielderror-use-defaults-thrown-while-validating-json-schema-thr,
spark is using jackson version before 2.6 causing "NoSuchFieldError:
USE_DEFAULTS".

I have done:

1. succeed to run the same application through spark-submit.

2. make sure the spark dependencies are 2.2.0 to be consistent with that in

Re: Can spark handle this scenario?

2018-02-26 Thread Lian Jiang
Thanks Vijay. After changing the programming model (create a context class
for the workers), it finally worked for me. Cheers.

On Fri, Feb 23, 2018 at 5:42 PM, vijay.bvp  wrote:

> when HTTP connection is opened you are opening a connection between
> specific
> machine (with IP and NIC card) to another specific machine, so this can't
> be
> serialized and used on other machine right!!
>
> This isn't spark limitation.
>
> I made a simple diagram if it helps. The Objects created at driver and
> passed to worker need to be serialized. The objects created at workers need
> not.
>
> In the diagram you have to create HTTPConnection on the executors
> independently of the driver.
> The HTTPConnection created at Executor-1 can be used for partitions P1-P3
> of
> RDD available on that executor.
>
> Spark is tolerant and does allow passing objects from driver to workers,
> but
> in case if it reports "Task not serializable"  it does indicate some object
> is having issue. mark the class as Serializable if you think if the object
> of it can be serialized. As I said in the beginning not everything could
> serializable particularly http connections, JDBC connections etc..
>
>  file/t8878/Picture1.png>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Can spark handle this scenario?

2018-02-22 Thread Lian Jiang
Hi Vijay,

Should HTTPConnection() (or any other object created per partition) be
serializable so that your code work? If so, the usage seems to be limited.

Sometimes, the error caused by a non-serializable object can be very
misleading (e.g. "Return statements aren't allowed in Spark closures")
instead of "Task not serializable".

The post shared by Anastasios helps but does not completely resolve the
"need serialization" problem. For example, if I need to create per
partition class object that
relies on other objects which may not be serializable, then wrapping the
object creation in an object making it a static function does not help, not
mentioning
the programming model becomes unintuitive.

I have been played this scenario for some time and still frustrated. Thanks






On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp  wrote:

> I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
> with some token passed, in the code provided so far if you have 2000
> symbols, it will make 2000 new connections!! and 2000 API calls
> connection objects can't/shouldn't be serialized and send to executors,
> they
> should rather be created at executors.
>
> the philosophy given below is nicely documented on Spark Streaming, look at
> Design Patterns for using foreachRDD
> https://spark.apache.org/docs/latest/streaming-programming-
> guide.html#output-operations-on-dstreams
>
>
> case class Symbol(symbol: String, sector: String)
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
> //assume symbolDs is rdd of symbol and tick dataset/dataframe can be
> converted to RDD
> symbolRdd.foreachPartition(partition => {
>//this code runs at executor
>   //open a connection here -
>   val connectionToYahoo = new HTTPConnection()
>
>   partition.foreach(k => {
>   pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
>   }
> }
>
> with the above code if the dataset has 10 partitions (2000 symbols), only
> 10
> connections will be opened though it makes 2000 API calls.
> you should also be looking at sending and receiving results for large
> number
> of symbols, because of the amount of parallelism that spark provides you
> might run into rate limit on the APIs. if you are bulk sending symbols
> above
> pattern also very much useful
>
> thanks
> Vijay
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Return statements aren't allowed in Spark closures

2018-02-21 Thread Lian Jiang
Sorry Bryan. Unfortunately, this is not the root cause.

Any other ideas? This is blocking my scenario. Thanks.

On Wed, Feb 21, 2018 at 4:26 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Lian,
>
> You're writing Scala. Just remove the 'return'. No need for it in Scala.
>
> Get Outlook for Android <https://aka.ms/ghei36>
>
> --
> *From:* Lian Jiang <jiangok2...@gmail.com>
> *Sent:* Wednesday, February 21, 2018 4:16:08 PM
> *To:* user
> *Subject:* Return statements aren't allowed in Spark closures
>
> I can run below code in spark-shell using yarn client mode.
>
> val csv = spark.read.option("header", "true").csv("my.csv")
>
> def queryYahoo(row: Row) : Int = { return 10; }
>
> csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
> queryYahoo(r) })}
>
> However, the same code failed when run using spark-submit in yarn client
> or cluster mode due to error:
>
> 18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception:
> org.apache.spark.util.ReturnStatementInClosureException: Return
> statements aren't allowed in Spark closures
>
> org.apache.spark.util.ReturnStatementInClosureException: Return
> statements aren't allowed in Spark closures
>
> at org.apache.spark.util.ReturnStatementFinder$$anon$1.
> visitTypeInsn(ClosureCleaner.scala:371)
>
> at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
>
> at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
>
> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>
> at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
>
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:243)
>
> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$
> ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)
>
> at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$
> ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)
>
> at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
> TraversableLike.scala:733)
>
> at scala.collection.immutable.List.foreach(List.scala:381)
>
> at scala.collection.TraversableLike$WithFilter.
> foreach(TraversableLike.scala:732)
>
> at org.apache.spark.util.ClosureCleaner$.org$apache$
> spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:292)
>
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
>
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:925)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:924)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>
>
> Any idea? Thanks.
>


Return statements aren't allowed in Spark closures

2018-02-21 Thread Lian Jiang
I can run below code in spark-shell using yarn client mode.

val csv = spark.read.option("header", "true").csv("my.csv")

def queryYahoo(row: Row) : Int = { return 10; }

csv.repartition(5).rdd.foreachPartition{ p => p.foreach(r => {
queryYahoo(r) })}

However, the same code failed when run using spark-submit in yarn client or
cluster mode due to error:

18/02/21 21:00:12 ERROR ApplicationMaster: User class threw exception:
org.apache.spark.util.ReturnStatementInClosureException: Return statements
aren't allowed in Spark closures

org.apache.spark.util.ReturnStatementInClosureException: Return statements
aren't allowed in Spark closures

at
org.apache.spark.util.ReturnStatementFinder$$anon$1.visitTypeInsn(ClosureCleaner.scala:371)

at org.apache.xbean.asm5.ClassReader.a(Unknown Source)

at org.apache.xbean.asm5.ClassReader.b(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)

at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:243)

at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:306)

at
org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$22.apply(ClosureCleaner.scala:292)

at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)

at scala.collection.immutable.List.foreach(List.scala:381)

at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:292)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:925)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)


Any idea? Thanks.


Re: Can spark handle this scenario?

2018-02-20 Thread Lian Jiang
Thanks Vijay! This is very clear.

On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp  wrote:

> I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
> with some token passed, in the code provided so far if you have 2000
> symbols, it will make 2000 new connections!! and 2000 API calls
> connection objects can't/shouldn't be serialized and send to executors,
> they
> should rather be created at executors.
>
> the philosophy given below is nicely documented on Spark Streaming, look at
> Design Patterns for using foreachRDD
> https://spark.apache.org/docs/latest/streaming-programming-
> guide.html#output-operations-on-dstreams
>
>
> case class Symbol(symbol: String, sector: String)
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
> //assume symbolDs is rdd of symbol and tick dataset/dataframe can be
> converted to RDD
> symbolRdd.foreachPartition(partition => {
>//this code runs at executor
>   //open a connection here -
>   val connectionToYahoo = new HTTPConnection()
>
>   partition.foreach(k => {
>   pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
>   }
> }
>
> with the above code if the dataset has 10 partitions (2000 symbols), only
> 10
> connections will be opened though it makes 2000 API calls.
> you should also be looking at sending and receiving results for large
> number
> of symbols, because of the amount of parallelism that spark provides you
> might run into rate limit on the APIs. if you are bulk sending symbols
> above
> pattern also very much useful
>
> thanks
> Vijay
>
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
Thanks Anastasios. This link is helpful!

On Sat, Feb 17, 2018 at 11:05 AM, Anastasios Zouzias <zouz...@gmail.com>
wrote:

> Hi Lian,
>
> The remaining problem is:
>
>
> Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
> fn(k) } to work. This could be hard since some classes in third party
> libraries are not serializable. This restricts the power of using spark to
> parallel an operation on multiple machines. Hope this is clear.
>
>
> This is not entirely true. You can bypass the serialisation issue in most
> cases, see the link below for an example.
>
> https://www.nicolaferraro.me/2016/02/22/using-non-serializable-objects-in-
> apache-spark/
>
> In a nutshell, the non-serialisable code is available to all executors, so
> there is no need for Spark to serialise from the driver to the executors.
>
> Best regards,
> Anastasios
>
>
>
>
> On Sat, Feb 17, 2018 at 6:13 PM, Lian Jiang <jiangok2...@gmail.com> wrote:
>
>> *Snehasish,*
>>
>> I got this in spark-shell 2.11.8:
>>
>> case class My(name:String, age:Int)
>>
>> import spark.implicits._
>>
>> val t = List(new My("lian", 20), new My("sh", 3)).toDS
>>
>> t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])
>>
>>
>> :31: error: type getClass is not a member of object My
>>
>>t.map{ k=> print(My) }(org.apache.spark.sql.Encoder
>> s.kryo[My.getClass])
>>
>>
>>
>> Using RDD can workaround this issue as mentioned in previous emails:
>>
>>
>>  t.rdd.map{ k=> print(k) }
>>
>>
>> *Holden,*
>>
>>
>> The remaining problem is:
>>
>>
>> Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
>> fn(k) } to work. This could be hard since some classes in third party
>> libraries are not serializable. This restricts the power of using spark to
>> parallel an operation on multiple machines. Hope this is clear.
>>
>>
>> On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA <
>> info.snehas...@gmail.com> wrote:
>>
>>> Hi  Lian,
>>>
>>> This could be the solution
>>>
>>>
>>> case class Symbol(symbol: String, sector: String)
>>>
>>> case class Tick(symbol: String, sector: String, open: Double, close:
>>> Double)
>>>
>>>
>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>>> Dataset[Tick]
>>>
>>>
>>> symbolDs.map { k =>
>>>
>>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>>
>>> }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])
>>>
>>>
>>> Thanks,
>>>
>>> Snehasish
>>>
>>>
>>> Regards,
>>> Snehasish
>>>
>>> On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau <holden.ka...@gmail.com>
>>> wrote:
>>>
>>>> I'm not sure what you mean by it could be hard to serialize complex
>>>> operations?
>>>>
>>>> Regardless I think the question is do you want to parallelize this on
>>>> multiple machines or just one?
>>>>
>>>> On Feb 17, 2018 4:20 PM, "Lian Jiang" <jiangok2...@gmail.com> wrote:
>>>>
>>>>> Thanks Ayan. RDD may support map better than Dataset/DataFrame.
>>>>> However, it could be hard to serialize complex operation for Spark to
>>>>> execute in parallel. IMHO, spark does not fit this scenario. Hope this
>>>>> makes sense.
>>>>>
>>>>> On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <guha.a...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> ** You do NOT need dataframes, I mean.
>>>>>>
>>>>>> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <guha.a...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi
>>>>>>>
>>>>>>> Couple of suggestions:
>>>>>>>
>>>>>>> 1. Do not use Dataset, use Dataframe in this scenario. There is no
>>>>>>> benefit of dataset features here. Using Dataframe, you can write an
>>>>>>> arbitrary UDF which can do what you want to do.
>>>>>>> 2. In fact you do need dataframes here. You would be better off with
>>>>>>> RDD here. just create a RDD of symbols and use map to do the processing.
>>>>>>>

Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
Agreed. Thanks.

On Sat, Feb 17, 2018 at 9:53 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> You may want to think about separating the import step from the processing
> step. It is not very economical to download all the data again every time
> you want to calculate something. So download it first and store it on a
> distributed file system. Schedule to download newest information every day/
> hour etc. you can store it using a query optimized format such as ORC or
> Parquet. Then you can run queries over it.
>
> On 17. Feb 2018, at 01:10, Lian Jiang <jiangok2...@gmail.com> wrote:
>
> Hi,
>
> I have a user case:
>
> I want to download S stock data from Yahoo API in parallel using
> Spark. I have got all stock symbols as a Dataset. Then I used below code to
> call Yahoo API for each symbol:
>
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }
>
>
> This statement cannot compile:
>
>
> Unable to find encoder for type stored in a Dataset.  Primitive types
> (Int, String, etc) and Product types (case classes) are supported by
> importing spark.implicits._  Support for serializing other types will be
> added in future releases.
>
>
> My questions are:
>
>
> 1. As you can see, this scenario is not traditional dataset handling such
> as count, sql query... Instead, it is more like a UDF which apply random
> operation on each record. Is Spark good at handling such scenario?
>
>
> 2. Regarding the compilation error, any fix? I did not find a satisfactory
> solution online.
>
>
> Thanks for help!
>
>
>
>
>


Re: Can spark handle this scenario?

2018-02-17 Thread Lian Jiang
*Snehasish,*

I got this in spark-shell 2.11.8:

case class My(name:String, age:Int)

import spark.implicits._

val t = List(new My("lian", 20), new My("sh", 3)).toDS

t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])


:31: error: type getClass is not a member of object My

   t.map{ k=> print(My)
}(org.apache.spark.sql.Encoders.kryo[My.getClass])



Using RDD can workaround this issue as mentioned in previous emails:


 t.rdd.map{ k=> print(k) }


*Holden,*


The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=>
fn(k) } to work. This could be hard since some classes in third party
libraries are not serializable. This restricts the power of using spark to
parallel an operation on multiple machines. Hope this is clear.


On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA <info.snehas...@gmail.com>
wrote:

> Hi  Lian,
>
> This could be the solution
>
>
> case class Symbol(symbol: String, sector: String)
>
> case class Tick(symbol: String, sector: String, open: Double, close:
> Double)
>
>
> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]
>
>
> symbolDs.map { k =>
>
>   pullSymbolFromYahoo(k.symbol, k.sector)
>
> }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])
>
>
> Thanks,
>
> Snehasish
>
>
> Regards,
> Snehasish
>
> On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau <holden.ka...@gmail.com>
> wrote:
>
>> I'm not sure what you mean by it could be hard to serialize complex
>> operations?
>>
>> Regardless I think the question is do you want to parallelize this on
>> multiple machines or just one?
>>
>> On Feb 17, 2018 4:20 PM, "Lian Jiang" <jiangok2...@gmail.com> wrote:
>>
>>> Thanks Ayan. RDD may support map better than Dataset/DataFrame. However,
>>> it could be hard to serialize complex operation for Spark to execute in
>>> parallel. IMHO, spark does not fit this scenario. Hope this makes sense.
>>>
>>> On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> ** You do NOT need dataframes, I mean.
>>>>
>>>> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> Couple of suggestions:
>>>>>
>>>>> 1. Do not use Dataset, use Dataframe in this scenario. There is no
>>>>> benefit of dataset features here. Using Dataframe, you can write an
>>>>> arbitrary UDF which can do what you want to do.
>>>>> 2. In fact you do need dataframes here. You would be better off with
>>>>> RDD here. just create a RDD of symbols and use map to do the processing.
>>>>>
>>>>> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <irving.du...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> Do you only want to use Scala? Because otherwise, I think with
>>>>>> pyspark and pandas read table you should be able to accomplish what you
>>>>>> want to accomplish.
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Irving Duran
>>>>>>
>>>>>> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have a user case:
>>>>>>
>>>>>> I want to download S stock data from Yahoo API in parallel using
>>>>>> Spark. I have got all stock symbols as a Dataset. Then I used below code 
>>>>>> to
>>>>>> call Yahoo API for each symbol:
>>>>>>
>>>>>>
>>>>>>
>>>>>> case class Symbol(symbol: String, sector: String)
>>>>>>
>>>>>> case class Tick(symbol: String, sector: String, open: Double, close:
>>>>>> Double)
>>>>>>
>>>>>>
>>>>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>>>>>> Dataset[Tick]
>>>>>>
>>>>>>
>>>>>> symbolDs.map { k =>
>>>>>>
>>>>>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>>>>>
>>>>>> }
>>>>>>
>>>>>>
>>>>>> This statement cannot compile:
>>>>>>
>>>>>>
>>>>>> Unable to find encoder for type stored in a Dataset.  Primitive
>>>>>> types (Int, String, etc) and Product types (case classes) are supported 
>>>>>> by
>>>>>> importing spark.implicits._  Support for serializing other types
>>>>>> will be added in future releases.
>>>>>>
>>>>>>
>>>>>> My questions are:
>>>>>>
>>>>>>
>>>>>> 1. As you can see, this scenario is not traditional dataset handling
>>>>>> such as count, sql query... Instead, it is more like a UDF which apply
>>>>>> random operation on each record. Is Spark good at handling such scenario?
>>>>>>
>>>>>>
>>>>>> 2. Regarding the compilation error, any fix? I did not find a
>>>>>> satisfactory solution online.
>>>>>>
>>>>>>
>>>>>> Thanks for help!
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>


Re: Can spark handle this scenario?

2018-02-16 Thread Lian Jiang
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it
could be hard to serialize complex operation for Spark to execute in
parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <guha.a...@gmail.com> wrote:

> ** You do NOT need dataframes, I mean.
>
> On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> Couple of suggestions:
>>
>> 1. Do not use Dataset, use Dataframe in this scenario. There is no
>> benefit of dataset features here. Using Dataframe, you can write an
>> arbitrary UDF which can do what you want to do.
>> 2. In fact you do need dataframes here. You would be better off with RDD
>> here. just create a RDD of symbols and use map to do the processing.
>>
>> On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <irving.du...@gmail.com>
>> wrote:
>>
>>> Do you only want to use Scala? Because otherwise, I think with pyspark
>>> and pandas read table you should be able to accomplish what you want to
>>> accomplish.
>>>
>>> Thank you,
>>>
>>> Irving Duran
>>>
>>> On 02/16/2018 06:10 PM, Lian Jiang wrote:
>>>
>>> Hi,
>>>
>>> I have a user case:
>>>
>>> I want to download S stock data from Yahoo API in parallel using
>>> Spark. I have got all stock symbols as a Dataset. Then I used below code to
>>> call Yahoo API for each symbol:
>>>
>>>
>>>
>>> case class Symbol(symbol: String, sector: String)
>>>
>>> case class Tick(symbol: String, sector: String, open: Double, close:
>>> Double)
>>>
>>>
>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>>> Dataset[Tick]
>>>
>>>
>>> symbolDs.map { k =>
>>>
>>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>>
>>> }
>>>
>>>
>>> This statement cannot compile:
>>>
>>>
>>> Unable to find encoder for type stored in a Dataset.  Primitive types
>>> (Int, String, etc) and Product types (case classes) are supported by
>>> importing spark.implicits._  Support for serializing other types will
>>> be added in future releases.
>>>
>>>
>>> My questions are:
>>>
>>>
>>> 1. As you can see, this scenario is not traditional dataset handling
>>> such as count, sql query... Instead, it is more like a UDF which apply
>>> random operation on each record. Is Spark good at handling such scenario?
>>>
>>>
>>> 2. Regarding the compilation error, any fix? I did not find a
>>> satisfactory solution online.
>>>
>>>
>>> Thanks for help!
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Can spark handle this scenario?

2018-02-16 Thread Lian Jiang
Hi,

I have a user case:

I want to download S stock data from Yahoo API in parallel using
Spark. I have got all stock symbols as a Dataset. Then I used below code to
call Yahoo API for each symbol:



case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


symbolDs.map { k =>

  pullSymbolFromYahoo(k.symbol, k.sector)

}


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int,
String, etc) and Product types (case classes) are supported by importing
spark.implicits._  Support for serializing other types will be added in
future releases.


My questions are:


1. As you can see, this scenario is not traditional dataset handling such
as count, sql query... Instead, it is more like a UDF which apply random
operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory
solution online.


Thanks for help!


Re: saveAsTable does not respect spark.sql.warehouse.dir

2018-02-11 Thread Lian Jiang
Thanks guys. prashanth's idea worked for me. Appreciate very much!

On Sun, Feb 11, 2018 at 10:20 AM, prashanth t <tprashanth...@gmail.com>
wrote:

> Hi Lian,
>
> Please add below command before creating table.
> "Use (database_name)"
> By default saveAsTable uses default database of hive. You might not have
> access to it that's causing problems.
>
>
>
> Thanks
> Prashanth Thipparthi
>
>
>
>
>
> On 11 Feb 2018 10:45 pm, "Lian Jiang" <jiangok2...@gmail.com> wrote:
>
> I started spark-shell with below command:
>
> spark-shell --master yarn --conf spark.sql.warehouse.dir="/user/spark"
>
> In spark-shell, below statement can create a managed table using
> /user/spark HDFS folder:
>
> spark.sql("CREATE TABLE t5 (i int) USING PARQUET")
>
> However, below statements still use spark-warehouse in local folder like
> {currentfolder}/spark-warehouse.
>
> case class SymbolInfo(name: String, sector: String)
>
> val siDS = Seq(
>   SymbolInfo("AAPL", "IT"),
>   SymbolInfo("GOOG", "IT")
> ).toDS()
>
> siDS.write.saveAsTable("siDS")
>
> How can I make saveAsTable respect spark.sql.warehouse.dir when creating
> a managed table? Appreciate any help!
>
>
>


Re: Spark cannot find tables in Oracle database

2018-02-11 Thread Lian Jiang
Thanks Guys for help!

Georg's proposal fixed the issue. Thanks a lot.

On Sun, Feb 11, 2018 at 7:59 AM, Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> I had the same problem. You need to uppercase all tables prior to storing
> them in oracle.
> Gourav Sengupta <gourav.sengu...@gmail.com> schrieb am So. 11. Feb. 2018
> um 10:44:
>
>> Hi,
>>
>> since you are using the same user as the schema, I do not think that
>> there is an access issue. Perhaps you might want to see whether there is
>> anything case sensitive about the the table names. I remember once that the
>> table names had to be in small letters, but that was in MYSQL.
>>
>>
>> Regards,
>> Gourav
>>
>> On Sun, Feb 11, 2018 at 2:26 AM, Lian Jiang <jiangok2...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am following https://spark.apache.org/docs/latest/sql-
>>> programming-guide.html#jdbc-to-other-databases to query oracle database
>>> 12.1 from spark shell 2.11.8.
>>>
>>> val jdbcDF = spark.read
>>>   .format("jdbc")
>>>   .option("url", "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = 
>>> TCP)(HOST = 129.106.123.73)(PORT = 1521))(CONNECT_DATA =(SERVER = 
>>> DEDICATED)(SERVICE_NAME = pdb1.subnet1.hadoop.oraclevcn.com)))")
>>>   .option("dbtable", "HADOOP_DEV.SYMBOLINFO")
>>>   .option("user", "hadoop_dev")
>>>   .option("password", "mypassword")
>>>   .load()
>>>
>>> This statement failed due to "ORA-00942: table or view does not exist"
>>> even SymbolInfo table does exist in hadoop_dev schema.
>>>
>>> Any clue? Thanks!
>>>
>>
>>


Spark cannot find tables in Oracle database

2018-02-10 Thread Lian Jiang
Hi,

I am following
https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
to query oracle database 12.1 from spark shell 2.11.8.

val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS =
(PROTOCOL = TCP)(HOST = 129.106.123.73)(PORT = 1521))(CONNECT_DATA
=(SERVER = DEDICATED)(SERVICE_NAME =
pdb1.subnet1.hadoop.oraclevcn.com)))")
  .option("dbtable", "HADOOP_DEV.SYMBOLINFO")
  .option("user", "hadoop_dev")
  .option("password", "mypassword")
  .load()

This statement failed due to "ORA-00942: table or view does not exist" even
SymbolInfo table does exist in hadoop_dev schema.

Any clue? Thanks!