[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, 
a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:python}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and not x[4] 
is None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames

[jira] [Commented] (SPARK-22796) Add multiple column support to PySpark QuantileDiscretizer

2019-04-30 Thread Dor Kedem (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-22796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830867#comment-16830867
 ] 

Dor Kedem commented on SPARK-22796:
---

Thanks for your work. Isn't there a PR for this issue already in place? 
[https://github.com/apache/spark/pull/19892]

Is there something blocking it? Can I help to pick it up?

> Add multiple column support to PySpark QuantileDiscretizer
> --
>
> Key: SPARK-22796
> URL: https://issues.apache.org/jira/browse/SPARK-22796
> Project: Spark
>  Issue Type: New Feature
>  Components: ML, PySpark
>Affects Versions: 2.3.0
>Reporter: Nick Pentreath
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24530) Sphinx doesn't render autodoc_docstring_signature correctly (with Python 2?) and pyspark.ml docs are broken

2019-04-30 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830846#comment-16830846
 ] 

Apache Spark commented on SPARK-24530:
--

User 'gatorsmile' has created a pull request for this issue:
https://github.com/apache/spark/pull/24503

> Sphinx doesn't render autodoc_docstring_signature correctly (with Python 2?) 
> and pyspark.ml docs are broken
> ---
>
> Key: SPARK-24530
> URL: https://issues.apache.org/jira/browse/SPARK-24530
> Project: Spark
>  Issue Type: Bug
>  Components: ML, PySpark
>Affects Versions: 2.4.0
>Reporter: Xiangrui Meng
>Assignee: Hyukjin Kwon
>Priority: Critical
> Fix For: 2.3.2, 2.4.0
>
> Attachments: Screen Shot 2018-06-12 at 8.23.18 AM.png, Screen Shot 
> 2018-06-12 at 8.23.29 AM.png, image-2018-06-13-15-15-51-025.png, 
> pyspark-ml-doc-utuntu18.04-python2.7-sphinx-1.7.5.png
>
>
> I generated python docs from master locally using `make html`. However, the 
> generated html doc doesn't render class docs correctly. I attached the 
> screenshot from Spark 2.3 docs and master docs generated on my local. Not 
> sure if this is because my local setup.
> cc: [~dongjoon] Could you help verify?
>  
> The followings are our released doc status. Some recent docs seems to be 
> broken.
> *2.1.x*
> (O) 
> [https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression]
> (O) 
> [https://spark.apache.org/docs/2.1.1/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression]
> (X) 
> [https://spark.apache.org/docs/2.1.2/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression]
> *2.2.x*
> (O) 
> [https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression]
> (X) 
> [https://spark.apache.org/docs/2.2.1/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression]
> *2.3.x*
> (O) 
> [https://spark.apache.org/docs/2.3.0/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression]
> (X) 
> [https://spark.apache.org/docs/2.3.1/api/python/pyspark.ml.html#pyspark.ml.classification.LogisticRegression]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, 
a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:python}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Commented] (SPARK-27602) SparkSQL CBO can't get true size of partition table after partition pruning

2019-04-30 Thread angerszhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830836#comment-16830836
 ] 

angerszhu commented on SPARK-27602:
---

Want to do this need to change the calculate model. I am tring to build a 
framework to estimate cost of a sql, need to solve this. If it ‘s ok for the 
origin model . I will try to change it.
On 05/01/2019 00:43, [1]Hyukjin Kwon (JIRA) wrote:
   [ 
https://issues.apache.org/jira/browse/SPARK-27602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830455#comment-16830455
 ]

Hyukjin Kwon commented on SPARK-27602:
--

So, what's proposal to fix it?




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[1] mailto:j...@apache.org


> SparkSQL CBO can't get true size of partition table after partition pruning
> ---
>
> Key: SPARK-27602
> URL: https://issues.apache.org/jira/browse/SPARK-27602
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.4.0
>Reporter: angerszhu
>Priority: Major
>
> When I want to do extract a cost of one sql for myself's cost framework,  I 
> found that CBO  can't get true size of partition table  since when partition 
> pruning is true. we just need corresponding partition's size. It just use the 
> tables's statistic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

_QueryResponse_ object (from azure.loganalytics package) contains, essentialy, 
a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[here|https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

flatMap is called on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. A schema is created based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

I call flatMap on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. I also create the schema based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is **not** cached before creating the DataFrame from the 
Rows RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Component/s: (was: Spark Core)
 PySpark

> Caching an RDD composed of Row Objects produces some kind of key recombination
> --
>
> Key: SPARK-27613
> URL: https://issues.apache.org/jira/browse/SPARK-27613
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.0
>Reporter: Andres Fernandez
>Priority: Major
>
> (Code included at the bottom)
> The function "+create_dataframes_from_azure_responses_rdd+" receives 
> *table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
> _str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
> then go ahead and iterate over the table names to create dataframes filtering 
> the RDDs by the first element and valid response.
> So far so good.
> QueryResponse object (from azure.loganalytics package) contains, essentialy, 
> a list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
> Every single response (fifth element of the tuple [4]) for the same table 
> name (first element of the tuple [0]) has exactly the same columns in the 
> same order (order is not important other thant to reference the rows data 
> inside the same response anyways). The types are stored in *column_types* 
> taking the first response as the sample.
> Now to the tricky part.
> I call flatMap on the *responses_rdd* with the function 
> +"tabularize_response_rdd+" which basically creates a Row object for every 
> row (_list_ of _str_) in the _QueryResponse_. I also create the schema based 
> on a *type_map* from azure types to spark.sql.types in order to specify it to 
> the subsequent createDataFrame instruction. If the result of this flatMap, 
> *table_tabular_rdd*, is not cached before creating the DataFrame from the 
> Rows RDD everything works smoothly. Nevertheless if the result of the 
> flatMap, *table_tabular_rdd*, is cached the before creating the DataFrame a 
> mismatch is evidenced between the actual key:values for the Row objects.
> It would be good to point that when a Row Object is created from an unpacked 
> dict the code in 
> [[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
>  sorts the keys; is this behaviour overriden somehow by caching?
> Let me please know what I am doing wrong, is there any best practice / 
> documented solution I am not following? Im just a beginner when it comes to 
> Spark and would happily accept any suggestion. I hope I was clear enough, and 
> I am open to give you any additional details that might be helpful. Thank 
> you! (Code and error attached as well).
> The error looks like if it was related to casting, but it can be seen that 
> the contents do not correspond to the key. *record_count* key is actually a 
> Long but in the Row it got somehow swapped for another key's value, in this 
> case 'n/a'.
> {code:java}
> def create_dataframes_from_azure_responses_rdd(table_names: list, 
> responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
>   ws_column_name = "WorkspaceId"
>   def tabularize_response_rdd(x: tuple):
> import pyspark
> tn, wsid, count, interval, response = x
> ret = []
> if response.tables[0].rows:
>   ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
> in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
> return ret
>   data_frames = {}
>   for tn in table_names:
> if verbose: print("Filtering RDD items for {}".format(tn))
> table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
> x[4]!=None).cache()
> 
> data_frames[tn] = None
> if verbose: print("Checking if RDD for {} has data".format(tn))
> if not table_response_rdd.isEmpty():
>   if verbose: print("Getting column types for {} from azure 
> response".format(tn))
>   column_types = {f.name:f.type for f in 
> table_response_rdd.take(1)[0][4].tables[0].columns}
>   column_types[ws_column_name] = "string"
>   if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
>   table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
> #.cache() #Error with cache, no error without!
>   if verbose: print("Getting sample row for {}".format(tn))
>   row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
>   if verbose: print("Building schema for {} from sample row and column 
> types".format(tn))
>   current_schema = StructType([StructField(f, 
> type_map[column_types[f]](), True) for f in row_fields])
>   if verbose: print("Creating dataframe for {}".format(tn))
>   

[jira] [Updated] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andres Fernandez updated SPARK-27613:
-
Description: 
(Code included at the bottom)

The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

I call flatMap on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. I also create the schema based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is not cached before creating the DataFrame from the Rows 
RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected count {} vs Real count 
{}".format(expected_count, real_count))
  data_frames[tn]=table_df
else:
  if verbose: print("{} table was empty!".format(tn))
  return data_frames
{code}

[jira] [Created] (SPARK-27613) Caching an RDD composed of Row Objects produces some kind of key recombination

2019-04-30 Thread Andres Fernandez (JIRA)
Andres Fernandez created SPARK-27613:


 Summary: Caching an RDD composed of Row Objects produces some kind 
of key recombination
 Key: SPARK-27613
 URL: https://issues.apache.org/jira/browse/SPARK-27613
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Andres Fernandez


The function "+create_dataframes_from_azure_responses_rdd+" receives 
*table_names* (_list_ of _str_) and *responses_rdd* (rdd of tuples <_str_, 
_str_, _int_, _tuple_, _azure.loganalytics.models.QueryResponse_>). It will 
then go ahead and iterate over the table names to create dataframes filtering 
the RDDs by the first element and valid response.

So far so good.

QueryResponse object (from azure.loganalytics package) contains, essentialy, a 
list with 1 "_table_" which in turn has a "_columns_" and a "_rows_" field. 
Every single response (fifth element of the tuple [4]) for the same table name 
(first element of the tuple [0]) has exactly the same columns in the same order 
(order is not important other thant to reference the rows data inside the same 
response anyways). The types are stored in *column_types* taking the first 
response as the sample.

Now to the tricky part.

I call flatMap on the *responses_rdd* with the function 
+"tabularize_response_rdd+" which basically creates a Row object for every row 
(_list_ of _str_) in the _QueryResponse_. I also create the schema based on a 
*type_map* from azure types to spark.sql.types in order to specify it to the 
subsequent createDataFrame instruction. If the result of this flatMap, 
*table_tabular_rdd*, is not cached before creating the DataFrame from the Rows 
RDD everything works smoothly. Nevertheless if the result of the flatMap, 
*table_tabular_rdd*, is cached the before creating the DataFrame a mismatch is 
evidenced between the actual key:values for the Row objects.

It would be good to point that when a Row Object is created from an unpacked 
dict the code in 
[[https://github.com/apache/spark/blob/480357cc6d71c682fe703611c71c1e6a36e6ce9a/python/pyspark/sql/types.py#L1374-L1375]](here)
 sorts the keys; is this behaviour overriden somehow by caching?

Let me please know what I am doing wrong, is there any best practice / 
documented solution I am not following? Im just a beginner when it comes to 
Spark and would happily accept any suggestion. I hope I was clear enough, and I 
am open to give you any additional details that might be helpful. Thank you! 
(Code and error attached as well).

The error looks like if it was related to casting, but it can be seen that the 
contents do not correspond to the key. *record_count* key is actually a Long 
but in the Row it got somehow swapped for another key's value, in this case 
'n/a'.
{code:java}
def create_dataframes_from_azure_responses_rdd(table_names: list, 
responses_rdd: pyspark.rdd, verbose:bool=False) -> list:
  ws_column_name = "WorkspaceId"
  def tabularize_response_rdd(x: tuple):
import pyspark
tn, wsid, count, interval, response = x
ret = []
if response.tables[0].rows:
  ret = [pyspark.sql.Row(**{ws_column_name:wsid, **{fi.name:r[i] for i,fi 
in enumerate(response.tables[0].columns)}}) for r in response.tables[0].rows]
return ret
  data_frames = {}
  for tn in table_names:
if verbose: print("Filtering RDD items for {}".format(tn))
table_response_rdd = responses_rdd.filter(lambda x: x[0]==tn and 
x[4]!=None).cache()

data_frames[tn] = None
if verbose: print("Checking if RDD for {} has data".format(tn))
if not table_response_rdd.isEmpty():
  if verbose: print("Getting column types for {} from azure 
response".format(tn))
  column_types = {f.name:f.type for f in 
table_response_rdd.take(1)[0][4].tables[0].columns}
  column_types[ws_column_name] = "string"
  if verbose: print("Generating pyspark.sql.Row RDD for {}".format(tn))
  table_tabular_rdd = table_response_rdd.flatMap(tabularize_response_rdd) 
#.cache() #Error with cache, no error without!
  if verbose: print("Getting sample row for {}".format(tn))
  row_fields = table_tabular_rdd.take(1)[0].asDict().keys()
  if verbose: print("Building schema for {} from sample row and column 
types".format(tn))
  current_schema = StructType([StructField(f, type_map[column_types[f]](), 
True) for f in row_fields])
  if verbose: print("Creating dataframe for {}".format(tn))
  table_df = spark.createDataFrame(table_tabular_rdd, 
schema=current_schema).cache()
  if verbose: print("Calculating expected count for {}".format(tn))
  expected_count = table_response_rdd.map(lambda x: 
(x[1],x[2])).distinct().map(lambda x: x[1]).sum()
  real_count = 
table_df.select("record_count").groupBy().sum().collect()[0][0]
  table_response_rdd.unpersist()
  #table_tabular_rdd.unpersist()
  if verbose: print("Expected 

[jira] [Updated] (SPARK-24422) Add JDK11 in our Jenkins' build servers

2019-04-30 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-24422:
--
Fix Version/s: 3.0.0

> Add JDK11 in our Jenkins' build servers
> ---
>
> Key: SPARK-24422
> URL: https://issues.apache.org/jira/browse/SPARK-24422
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Assignee: shane knapp
>Priority: Major
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24422) Add JDK11 in our Jenkins' build servers

2019-04-30 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830823#comment-16830823
 ] 

Dongjoon Hyun commented on SPARK-24422:
---

Thank you, [~shaneknapp]! :D

> Add JDK11 in our Jenkins' build servers
> ---
>
> Key: SPARK-24422
> URL: https://issues.apache.org/jira/browse/SPARK-24422
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Assignee: shane knapp
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27608) Upgrade Surefire plugin to 3.0.0-M3

2019-04-30 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-27608.
---
   Resolution: Fixed
 Assignee: Dongjoon Hyun
Fix Version/s: 3.0.0

This is resolved via https://github.com/apache/spark/pull/24501

> Upgrade Surefire plugin to 3.0.0-M3
> ---
>
> Key: SPARK-27608
> URL: https://issues.apache.org/jira/browse/SPARK-27608
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Assignee: Dongjoon Hyun
>Priority: Minor
> Fix For: 3.0.0
>
>
> This issue aims to upgrade Surefire to bring SUREFIRE-1613 .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24422) Add JDK11 in our Jenkins' build servers

2019-04-30 Thread shane knapp (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830772#comment-16830772
 ] 

shane knapp commented on SPARK-24422:
-

yeah, i think so!

On Tue, Apr 30, 2019 at 2:33 PM Dongjoon Hyun (JIRA) 



-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu


> Add JDK11 in our Jenkins' build servers
> ---
>
> Key: SPARK-24422
> URL: https://issues.apache.org/jira/browse/SPARK-24422
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Assignee: shane knapp
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-24422) Add JDK11 in our Jenkins' build servers

2019-04-30 Thread shane knapp (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shane knapp resolved SPARK-24422.
-
Resolution: Fixed

> Add JDK11 in our Jenkins' build servers
> ---
>
> Key: SPARK-24422
> URL: https://issues.apache.org/jira/browse/SPARK-24422
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Assignee: shane knapp
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18406) Race between end-of-task and completion iterator read lock release

2019-04-30 Thread Xingbo Jiang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-18406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830770#comment-16830770
 ] 

Xingbo Jiang commented on SPARK-18406:
--

This problem still exists in PythonRunner, since python side uses a pre-fetch 
model to consume the upstream data, and open another thread to serve output 
data to downstream operators, thus it's possible the Task finishes first and 
trigger the task cleanup logic, and then the CompletionIterator try to release 
the write lock it holds on some blocks and found the lock has already been 
released. I'll submit a PR to bypass the issue later.

> Race between end-of-task and completion iterator read lock release
> --
>
> Key: SPARK-18406
> URL: https://issues.apache.org/jira/browse/SPARK-18406
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager, Spark Core
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Josh Rosen
>Assignee: Xingbo Jiang
>Priority: Major
> Fix For: 2.0.3, 2.1.2, 2.2.0
>
>
> The following log comes from a production streaming job where executors 
> periodically die due to uncaught exceptions during block release:
> {code}
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7921
> 16/11/07 17:11:06 INFO Executor: Running task 0.0 in stage 2390.0 (TID 7921)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7922
> 16/11/07 17:11:06 INFO Executor: Running task 1.0 in stage 2390.0 (TID 7922)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7923
> 16/11/07 17:11:06 INFO Executor: Running task 2.0 in stage 2390.0 (TID 7923)
> 16/11/07 17:11:06 INFO TorrentBroadcast: Started reading broadcast variable 
> 2721
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7924
> 16/11/07 17:11:06 INFO Executor: Running task 3.0 in stage 2390.0 (TID 7924)
> 16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721_piece0 stored as 
> bytes in memory (estimated size 5.0 KB, free 4.9 GB)
> 16/11/07 17:11:06 INFO TorrentBroadcast: Reading broadcast variable 2721 took 
> 3 ms
> 16/11/07 17:11:06 INFO MemoryStore: Block broadcast_2721 stored as values in 
> memory (estimated size 9.4 KB, free 4.9 GB)
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_1 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_3 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_2 locally
> 16/11/07 17:11:06 INFO BlockManager: Found block rdd_2741_4 locally
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 2, boot = -566, init = 
> 567, finish = 1
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 7, boot = -540, init = 
> 541, finish = 6
> 16/11/07 17:11:06 INFO Executor: Finished task 2.0 in stage 2390.0 (TID 
> 7923). 1429 bytes result sent to driver
> 16/11/07 17:11:06 INFO PythonRunner: Times: total = 8, boot = -532, init = 
> 533, finish = 7
> 16/11/07 17:11:06 INFO Executor: Finished task 3.0 in stage 2390.0 (TID 
> 7924). 1429 bytes result sent to driver
> 16/11/07 17:11:06 ERROR Executor: Exception in task 0.0 in stage 2390.0 (TID 
> 7921)
> java.lang.AssertionError: assertion failed
>   at scala.Predef$.assert(Predef.scala:165)
>   at 
> org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
>   at 
> org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
>   at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:362)
>   at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:361)
>   at scala.Option.foreach(Option.scala:236)
>   at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
>   at 
> org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:356)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:356)
>   at 
> org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> 16/11/07 17:11:06 INFO CoarseGrainedExecutorBackend: Got assigned task 7925
> 16/11/07 17:11:06 INFO Executor: Running task 0.1 in stage 2390.0 

[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830760#comment-16830760
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:44 PM:
---

@[~dongjoon] I am wondering if structured streaming works in a similar scenario.


was (Author: skonto):
@[~dongjoon] I am wondering if structured streaming works.

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830760#comment-16830760
 ] 

Stavros Kontopoulos commented on SPARK-27598:
-

@[~dongjoon] I am wondering if structured streaming works.

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830758#comment-16830758
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:39 PM:
---

Btw if I do the trick and put the mappingFunction in an object like this with 
Spark 2.3.3 on restart I get:
{quote}def createContext(checkpointDirectory: String, inputDirectory: String, 
outputDirectory: String)
 : StreamingContext = {

...

object T extends Serializable {
 // Update the cumulative count using mapWithState
 // This will give a DStream made of state (which is the cumulative count of 
the words)
 val mappingFunc = (word: String, one: Option[Int], state: State[Int]) =>
Unknown macro: \{ val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val 
output = (word, sum) state.update(sum) output }
}

val stateDstream = words.mapWithState(
 StateSpec.function(T.mappingFunc).initialState(initialRDD))

}
{quote}
{quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog 
Writer queue interrupted.
 org.apache.spark.SparkException: This RDD lacks a SparkContext. It could 
happen in the following cases:
 (1) RDD transformations and actions are NOT invoked by the driver, but inside 
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, see 
SPARK-5063.
 (2) When a Spark Streaming job recovers from checkpoint, this exception will 
be hit if a reference to an RDD not defined by the streaming job is used in 
DStream operations. For more information, See SPARK-13758.
 at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
 at 
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529)
 at 
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
 at 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
{quote}


was (Author: skonto):
Btw if I do the trick and put the mappingFunction in an object like this with 
Spark 2.3.3 on restart I get:
{quote}
 def createContext(checkpointDirectory: String, inputDirectory: String, 
outputDirectory: String)
 : StreamingContext = {

...

object T extends Serializable {
 // Update the cumulative count using mapWithState
 // This will give a DStream made of state (which is the cumulative count of 
the words)
 val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
 val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
 val output = (word, sum)
 state.update(sum)
 output
 }
 }
{quote}
}
{quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog 
Writer queue interrupted.
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen 
in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside 
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, see 
SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be 
hit if a reference to an RDD not defined by the streaming job is used in 
DStream operations. For more information, See SPARK-13758.
 at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
 at 
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529)
 at 
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
 at 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
{quote}

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: 

[jira] [Commented] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830758#comment-16830758
 ] 

Stavros Kontopoulos commented on SPARK-27598:
-

Btw if I do the trick and put the mappingFunction in an object like this with 
Spark 2.3.3 on restart I get:
{quote}
 def createContext(checkpointDirectory: String, inputDirectory: String, 
outputDirectory: String)
 : StreamingContext = {

...

object T extends Serializable {
 // Update the cumulative count using mapWithState
 // This will give a DStream made of state (which is the cumulative count of 
the words)
 val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
 val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
 val output = (word, sum)
 state.update(sum)
 output
 }
 }
{quote}
}
{quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog 
Writer queue interrupted.
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen 
in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside 
of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) 
is invalid because the values transformation and count action cannot be 
performed inside of the rdd1.map transformation. For more information, see 
SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be 
hit if a reference to an RDD not defined by the streaming job is used in 
DStream operations. For more information, See SPARK-13758.
 at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90)
 at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
 at 
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529)
 at 
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193)
 at 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342)
{quote}

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27591) A bug in UnivocityParser prevents using UDT

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27591.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Issue resolved by pull request 24496
[https://github.com/apache/spark/pull/24496]

> A bug in UnivocityParser prevents using UDT
> ---
>
> Key: SPARK-27591
> URL: https://issues.apache.org/jira/browse/SPARK-27591
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.2
>Reporter: Artem Kalchenko
>Assignee: Artem Kalchenko
>Priority: Minor
> Fix For: 3.0.0
>
>
> I am trying to define a UserDefinedType based on String but different from 
> StringType in Spark 2.4.1 but it looks like there is a bug in Spark or I am 
> doing smth incorrectly.
> I define my type as follows:
> {code:java}
> class MyType extends UserDefinedType[MyValue] {
>   override def sqlType: DataType = StringType
>   ...
> }
> @SQLUserDefinedType(udt = classOf[MyType])
> case class MyValue
> {code}
> I expect it to be read and stored as String with just a custom SQL type. In 
> fact Spark can't read the string at all:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$11
>  cannot be cast to org.apache.spark.unsafe.types.UTF8String
> at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
> at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
> at 
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
> {code}
> the problem is with UnivocityParser.makeConverter that doesn't return (String 
> => Any) function but (String => (String => Any)) in the case of UDT, see 
> UnivocityParser:184
> {code:java}
> case udt: UserDefinedType[_] => (datum: String) =>
>   makeConverter(name, udt.sqlType, nullable, options)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27591) A bug in UnivocityParser prevents using UDT

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon reassigned SPARK-27591:


Assignee: Artem Kalchenko

> A bug in UnivocityParser prevents using UDT
> ---
>
> Key: SPARK-27591
> URL: https://issues.apache.org/jira/browse/SPARK-27591
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.2
>Reporter: Artem Kalchenko
>Assignee: Artem Kalchenko
>Priority: Minor
>
> I am trying to define a UserDefinedType based on String but different from 
> StringType in Spark 2.4.1 but it looks like there is a bug in Spark or I am 
> doing smth incorrectly.
> I define my type as follows:
> {code:java}
> class MyType extends UserDefinedType[MyValue] {
>   override def sqlType: DataType = StringType
>   ...
> }
> @SQLUserDefinedType(udt = classOf[MyType])
> case class MyValue
> {code}
> I expect it to be read and stored as String with just a custom SQL type. In 
> fact Spark can't read the string at all:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$11
>  cannot be cast to org.apache.spark.unsafe.types.UTF8String
> at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
> at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
> at 
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
> {code}
> the problem is with UnivocityParser.makeConverter that doesn't return (String 
> => Any) function but (String => (String => Any)) in the case of UDT, see 
> UnivocityParser:184
> {code:java}
> case udt: UserDefinedType[_] => (datum: String) =>
>   makeConverter(name, udt.sqlType, nullable, options)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27519) Pandas udf corrupting data

2019-04-30 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830755#comment-16830755
 ] 

Bryan Cutler commented on SPARK-27519:
--

I made SPARK-27612 for the problem with {{Row(value=[[None, None]])}}

> Pandas udf corrupting data
> --
>
> Key: SPARK-27519
> URL: https://issues.apache.org/jira/browse/SPARK-27519
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Jeff gold
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: Pandas UDF Bug.py
>
>
> While trying to use a pandas udf, i sent the udf 2 columns, a string and a 
> list of a list of strings. The second argument structure for example: 
> [['1'],['2'],['3']]
> But when getting this same value in the udf, i receive something like this: 
> [['1','2'],['3'],[]]
> I checked and the same row in the table has the list with the correct 
> structure, only in the udf did it change.
>  
> I don't know why this happens, but i do know it has something to do with the 
> fact that that row was the 10,001th row and last row in it's partition. 
> Pandas batch size is 10,000 so that row was sent as a second batch alone, and 
> that's the only thing that seems to cause it, having 1 or 2 rows in a second 
> batch of the partition. I was also able to get this with a second batch of 2 
> rows, the list wasn't changed except an empty list was added to the end. 
> Hope you can help me understand what is going on, thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27612) Creating a DataFrame in PySpark with ArrayType produces some Rows with Arrays of None

2019-04-30 Thread Bryan Cutler (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Cutler updated SPARK-27612:
-
Description: 
When creating a DataFrame with type {{ArrayType(IntegerType(), True)}} there 
ends up being rows that are filled with None.

 
{code:java}
In [1]: from pyspark.sql.types import ArrayType, IntegerType
     

In [2]: df = spark.createDataFrame([[1, 2, 3, 4]] * 100, 
ArrayType(IntegerType(), True))     

In [3]: df.distinct().collect() 
     
Out[3]: [Row(value=[None, None, None, None]), Row(value=[1, 2, 3, 4])]
{code}
 

>From this example, it is consistently at elements 97, 98:
{code:python}
In [5]: df.collect()[-5:]   
     
Out[5]: 
[Row(value=[1, 2, 3, 4]),
 Row(value=[1, 2, 3, 4]),
 Row(value=[None, None, None, None]),
 Row(value=[None, None, None, None]),
 Row(value=[1, 2, 3, 4])]
{code}
This also happens with a type of {{ArrayType(ArrayType(IntegerType(), True))}}

  was:
When creating a DataFrame with type {{ArrayType(IntegerType(), True)}} there 
ends up being rows that are filled with None.

 
{code:java}
In [1]: from pyspark.sql.types import ArrayType, IntegerType
     

In [2]: df = spark.createDataFrame([[1, 2, 3, 4]] * 100, 
ArrayType(IntegerType(), True))     

In [3]: df.distinct().collect() 
     
Out[3]: [Row(value=[None, None, None, None]), Row(value=[1, 2, 3, 4])]
{code}
 

>From this example, it is consistently at elements 97, 98:
{code}
In [5]: df.collect()[-5:]   
     
Out[5]: 
[Row(value=[1, 2, 3, 4]),
 Row(value=[1, 2, 3, 4]),
 Row(value=[None, None, None, None]),
 Row(value=[None, None, None, None]),
 Row(value=[1, 2, 3, 4])]
{code}
This also happens with a type of {{ArrayType(ArrayType(IntegerType(), True))}}


> Creating a DataFrame in PySpark with ArrayType produces some Rows with Arrays 
> of None
> -
>
> Key: SPARK-27612
> URL: https://issues.apache.org/jira/browse/SPARK-27612
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Bryan Cutler
>Priority: Major
>
> When creating a DataFrame with type {{ArrayType(IntegerType(), True)}} there 
> ends up being rows that are filled with None.
>  
> {code:java}
> In [1]: from pyspark.sql.types import ArrayType, IntegerType  
>    
> In [2]: df = spark.createDataFrame([[1, 2, 3, 4]] * 100, 
> ArrayType(IntegerType(), True))     
> In [3]: df.distinct().collect()   
>    
> Out[3]: [Row(value=[None, None, None, None]), Row(value=[1, 2, 3, 4])]
> {code}
>  
> From this example, it is consistently at elements 97, 98:
> {code:python}
> In [5]: df.collect()[-5:] 
>    
> Out[5]: 
> [Row(value=[1, 2, 3, 4]),
>  Row(value=[1, 2, 3, 4]),
>  Row(value=[None, None, None, None]),
>  Row(value=[None, None, None, None]),
>  Row(value=[1, 2, 3, 4])]
> {code}
> This also happens with a type of {{ArrayType(ArrayType(IntegerType(), True))}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27612) Creating a DataFrame in PySpark with ArrayType produces some Rows with Arrays of None

2019-04-30 Thread Bryan Cutler (JIRA)
Bryan Cutler created SPARK-27612:


 Summary: Creating a DataFrame in PySpark with ArrayType produces 
some Rows with Arrays of None
 Key: SPARK-27612
 URL: https://issues.apache.org/jira/browse/SPARK-27612
 Project: Spark
  Issue Type: Bug
  Components: PySpark, SQL
Affects Versions: 3.0.0
Reporter: Bryan Cutler


When creating a DataFrame with type {{ArrayType(IntegerType(), True)}} there 
ends up being rows that are filled with None.

 
{code:java}
In [1]: from pyspark.sql.types import ArrayType, IntegerType
     

In [2]: df = spark.createDataFrame([[1, 2, 3, 4]] * 100, 
ArrayType(IntegerType(), True))     

In [3]: df.distinct().collect() 
     
Out[3]: [Row(value=[None, None, None, None]), Row(value=[1, 2, 3, 4])]
{code}
 

>From this example, it is consistently at elements 97, 98:
{code}
In [5]: df.collect()[-5:]   
     
Out[5]: 
[Row(value=[1, 2, 3, 4]),
 Row(value=[1, 2, 3, 4]),
 Row(value=[None, None, None, None]),
 Row(value=[None, None, None, None]),
 Row(value=[1, 2, 3, 4])]
{code}
This also happens with a type of {{ArrayType(ArrayType(IntegerType(), True))}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830740#comment-16830740
 ] 

Dongjoon Hyun edited comment on SPARK-27598 at 4/30/19 11:24 PM:
-

If this fails with Spark 2.3 with Scala 2.11, please remove `Scala 2.12` from 
the title. It's misleading.


was (Author: dongjoon):
If this fails with Spark 2.3 with Scala 2.11, please remove `Scala 2.11` from 
the title. It's misleading.

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830752#comment-16830752
 ] 

Dongjoon Hyun commented on SPARK-27598:
---

Thanks!

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27611) Redundant javax.activation dependencies in the Maven build

2019-04-30 Thread Sean Owen (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-27611:
--
Priority: Minor  (was: Major)

Agree, exclude the jakarta one.

> Redundant javax.activation dependencies in the Maven build
> --
>
> Key: SPARK-27611
> URL: https://issues.apache.org/jira/browse/SPARK-27611
> Project: Spark
>  Issue Type: Dependency upgrade
>  Components: Build
>Affects Versions: 3.0.0
>Reporter: Cheng Lian
>Priority: Minor
>
> [PR #23890|https://github.com/apache/spark/pull/23890] introduced 
> {{org.glassfish.jaxb:jaxb-runtime:2.3.2}} as a runtime dependency. As an 
> unexpected side effect, {{jakarta.activation:jakarta.activation-api:1.2.1}} 
> was also pulled in as a transitive dependency. As a result, for the Maven 
> build, both of the following two jars can be found under 
> {{assembly/target/scala-2.12/jars}}:
> {noformat}
> activation-1.1.1.jar
> jakarta.activation-api-1.2.1.jar
> {noformat}
> Discussed this with [~srowen] offline and we agreed that we should probably 
> exclude the Jakarta one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830745#comment-16830745
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:21 PM:
---

 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}
I guess in this case its tries to de-serialize data from the checkpoint file 
using classes from the Spark Shell that will not exist when the the new shell 
is started like line16.


was (Author: skonto):
 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}
I guess in this case its tries to de-serialize data from the checkpoint file 
using classes from the Spark Shell that will not exist when the the new shell 
is started like line16.

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not 

[jira] [Commented] (SPARK-27593) CSV Parser returns 2 DataFrame - Valid and Malformed DFs

2019-04-30 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830747#comment-16830747
 ] 

Hyukjin Kwon commented on SPARK-27593:
--

Malformed column is just optional additional information to see which record is 
malformed. You can do that by checking if malformed column is null or not.

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> 
>
> Key: SPARK-27593
> URL: https://issues.apache.org/jira/browse/SPARK-27593
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.2
>Reporter: Ladislav Jech
>Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830745#comment-16830745
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:16 PM:
---

 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}
I guess in this case its tries to de-serialize data from the checkpoint file 
using classes from the Spark Shell that will not exist when the the new shell 
is started like line16.


was (Author: skonto):
 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}
I guess in this case its tries to deserialize data fromthe checkpoint file 
using classes from the Spark Shell that will not exist when the the new shell 
is started like line16.

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not 

[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830745#comment-16830745
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:15 PM:
---

 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}
I guess in this case its tries to deserialize data fromthe checkpoint file 
using classes from the Spark Shell that will not exist when the the new shell 
is started like line16.


was (Author: skonto):
 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}
I guess in this case its tries to serialize objects from the Spark Shell that 
will not exist when the the new shell is started like line16.

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see 

[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830745#comment-16830745
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:14 PM:
---

 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}
I guess in this case its tries to serialize objects from the Spark Shell that 
will not exist when the the new shell is started like line16.


was (Author: skonto):
 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was 

[jira] [Comment Edited] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830745#comment-16830745
 ] 

Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:13 PM:
---

 I will remove the language version from the title. This seems to be specific 
to the shell.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file 
[file:/tmp/checkpoint/checkpoint-155666587.bk|file:///tmp/checkpoint/checkpoint-155666587.bk]
 java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}


was (Author: skonto):
 I will remove the language version from the title.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file file:/tmp/checkpoint/checkpoint-155666587.bk
java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27598) DStreams checkpointing does not work with the Spark Shell

2019-04-30 Thread Stavros Kontopoulos (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stavros Kontopoulos updated SPARK-27598:

Summary: DStreams checkpointing does not work with the Spark Shell  (was: 
DStreams checkpointing does not work with Scala 2.12)

> DStreams checkpointing does not work with the Spark Shell
> -
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27598) DStreams checkpointing does not work with Scala 2.12

2019-04-30 Thread Stavros Kontopoulos (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830745#comment-16830745
 ] 

Stavros Kontopoulos commented on SPARK-27598:
-

 I will remove the language version from the title.

With Spark 2.3.3 which I assume has 2.11 by default it fails with a different 
error:
{quote}2019-05-01 02:12:00 WARN CheckpointReader:87 - Error reading checkpoint 
from file file:/tmp/checkpoint/checkpoint-155666587.bk
java.io.IOException: java.lang.ClassNotFoundException: 
$line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$MapWithState$$anonfun$3
 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1354)
 at 
org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:316)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
 at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
{quote}

> DStreams checkpointing does not work with Scala 2.12
> 
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27611) Redundant javax.activation dependencies in the Maven build

2019-04-30 Thread Cheng Lian (JIRA)
Cheng Lian created SPARK-27611:
--

 Summary: Redundant javax.activation dependencies in the Maven build
 Key: SPARK-27611
 URL: https://issues.apache.org/jira/browse/SPARK-27611
 Project: Spark
  Issue Type: Dependency upgrade
  Components: Build
Affects Versions: 3.0.0
Reporter: Cheng Lian


[PR #23890|https://github.com/apache/spark/pull/23890] introduced 
{{org.glassfish.jaxb:jaxb-runtime:2.3.2}} as a runtime dependency. As an 
unexpected side effect, {{jakarta.activation:jakarta.activation-api:1.2.1}} was 
also pulled in as a transitive dependency. As a result, for the Maven build, 
both of the following two jars can be found under 
{{assembly/target/scala-2.12/jars}}:
{noformat}
activation-1.1.1.jar
jakarta.activation-api-1.2.1.jar
{noformat}
Discussed this with [~srowen] offline and we agreed that we should probably 
exclude the Jakarta one.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27519) Pandas udf corrupting data

2019-04-30 Thread Bryan Cutler (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Cutler resolved SPARK-27519.
--
   Resolution: Fixed
Fix Version/s: 3.0.0

Problem does not happen when running the latest master.

> Pandas udf corrupting data
> --
>
> Key: SPARK-27519
> URL: https://issues.apache.org/jira/browse/SPARK-27519
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Jeff gold
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: Pandas UDF Bug.py
>
>
> While trying to use a pandas udf, i sent the udf 2 columns, a string and a 
> list of a list of strings. The second argument structure for example: 
> [['1'],['2'],['3']]
> But when getting this same value in the udf, i receive something like this: 
> [['1','2'],['3'],[]]
> I checked and the same row in the table has the list with the correct 
> structure, only in the udf did it change.
>  
> I don't know why this happens, but i do know it has something to do with the 
> fact that that row was the 10,001th row and last row in it's partition. 
> Pandas batch size is 10,000 so that row was sent as a second batch alone, and 
> that's the only thing that seems to cause it, having 1 or 2 rows in a second 
> batch of the partition. I was also able to get this with a second batch of 2 
> rows, the list wasn't changed except an empty list was added to the end. 
> Hope you can help me understand what is going on, thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27519) Pandas udf corrupting data

2019-04-30 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830743#comment-16830743
 ] 

Bryan Cutler edited comment on SPARK-27519 at 4/30/19 10:49 PM:


Problem does not happen when running the latest master. Marking resolved.


was (Author: bryanc):
Problem does not happen when running the latest master.

> Pandas udf corrupting data
> --
>
> Key: SPARK-27519
> URL: https://issues.apache.org/jira/browse/SPARK-27519
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Jeff gold
>Priority: Major
> Fix For: 3.0.0
>
> Attachments: Pandas UDF Bug.py
>
>
> While trying to use a pandas udf, i sent the udf 2 columns, a string and a 
> list of a list of strings. The second argument structure for example: 
> [['1'],['2'],['3']]
> But when getting this same value in the udf, i receive something like this: 
> [['1','2'],['3'],[]]
> I checked and the same row in the table has the list with the correct 
> structure, only in the udf did it change.
>  
> I don't know why this happens, but i do know it has something to do with the 
> fact that that row was the 10,001th row and last row in it's partition. 
> Pandas batch size is 10,000 so that row was sent as a second batch alone, and 
> that's the only thing that seems to cause it, having 1 or 2 rows in a second 
> batch of the partition. I was also able to get this with a second batch of 2 
> rows, the list wasn't changed except an empty list was added to the end. 
> Hope you can help me understand what is going on, thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27519) Pandas udf corrupting data

2019-04-30 Thread Bryan Cutler (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Cutler updated SPARK-27519:
-
Affects Version/s: (was: 3.0.0)

> Pandas udf corrupting data
> --
>
> Key: SPARK-27519
> URL: https://issues.apache.org/jira/browse/SPARK-27519
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0
>Reporter: Jeff gold
>Priority: Major
> Attachments: Pandas UDF Bug.py
>
>
> While trying to use a pandas udf, i sent the udf 2 columns, a string and a 
> list of a list of strings. The second argument structure for example: 
> [['1'],['2'],['3']]
> But when getting this same value in the udf, i receive something like this: 
> [['1','2'],['3'],[]]
> I checked and the same row in the table has the list with the correct 
> structure, only in the udf did it change.
>  
> I don't know why this happens, but i do know it has something to do with the 
> fact that that row was the 10,001th row and last row in it's partition. 
> Pandas batch size is 10,000 so that row was sent as a second batch alone, and 
> that's the only thing that seems to cause it, having 1 or 2 rows in a second 
> batch of the partition. I was also able to get this with a second batch of 2 
> rows, the list wasn't changed except an empty list was added to the end. 
> Hope you can help me understand what is going on, thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27519) Pandas udf corrupting data

2019-04-30 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830742#comment-16830742
 ] 

Bryan Cutler commented on SPARK-27519:
--

Thanks for the script [~f7faf8ba36], I was able to reproduce with Spark 2.3.0 
using pyarrow 0.8.0 and 0.12.1. With master, I did not see the issue so it 
could have been fixed by another Jira and will be in 3.0.0. I did not try out 
on Spark 2.4.0 . I'm going to close this then, but please try master or use 
3.0.0 when it is released.

I did notice something strange when running master though. I get rows with 
values of None for some reason, so if I run {{df.distinct().collect()}} then 
the output is {{[Row(value=[[None, None]]), Row(value=[[1, 2], [3, 4]])]}}. 
This does not seem related to the issue here, so I will open another JIRA.

> Pandas udf corrupting data
> --
>
> Key: SPARK-27519
> URL: https://issues.apache.org/jira/browse/SPARK-27519
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.3.0, 3.0.0
>Reporter: Jeff gold
>Priority: Major
> Attachments: Pandas UDF Bug.py
>
>
> While trying to use a pandas udf, i sent the udf 2 columns, a string and a 
> list of a list of strings. The second argument structure for example: 
> [['1'],['2'],['3']]
> But when getting this same value in the udf, i receive something like this: 
> [['1','2'],['3'],[]]
> I checked and the same row in the table has the list with the correct 
> structure, only in the udf did it change.
>  
> I don't know why this happens, but i do know it has something to do with the 
> fact that that row was the 10,001th row and last row in it's partition. 
> Pandas batch size is 10,000 so that row was sent as a second batch alone, and 
> that's the only thing that seems to cause it, having 1 or 2 rows in a second 
> batch of the partition. I was also able to get this with a second batch of 2 
> rows, the list wasn't changed except an empty list was added to the end. 
> Hope you can help me understand what is going on, thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27598) DStreams checkpointing does not work with Scala 2.12

2019-04-30 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830740#comment-16830740
 ] 

Dongjoon Hyun commented on SPARK-27598:
---

If this fails with Spark 2.3 with Scala 2.11, please remove `Scala 2.11` from 
the title. It's misleading.

> DStreams checkpointing does not work with Scala 2.12
> 
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27598) DStreams checkpointing does not work with Scala 2.12

2019-04-30 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830738#comment-16830738
 ] 

Dongjoon Hyun commented on SPARK-27598:
---

Thank you for reporting, [~skonto].
BTW, do you mean Spark 2.3/2.4 with Scala 2.11 work?

> DStreams checkpointing does not work with Scala 2.12
> 
>
> Key: SPARK-27598
> URL: https://issues.apache.org/jira/browse/SPARK-27598
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0
>Reporter: Stavros Kontopoulos
>Priority: Major
>
> When I restarted a stream with checkpointing enabled I got this:
> {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from 
> file 
> [file:/tmp/checkpoint/checkpoint-155656695.bk|file:///tmp/checkpoint/checkpoint-155656695.bk]
>  java.io.IOException: java.lang.ClassCastException: cannot assign instance of 
> java.lang.invoke.SerializedLambda to field 
> org.apache.spark.streaming.dstream.FileInputDStream.filter of type 
> scala.Function1 in instance of 
> org.apache.spark.streaming.dstream.FileInputDStream
>  at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322)
>  at 
> org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {quote}
> It seems that the closure is stored in the Serialized format and cannot be 
> assigned back to a scala function1
> Details of how to reproduce it here: 
> [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6]
> Maybe this is spark-shell specific and is not expected to work anyway, as I 
> dont see this to be an issues with a normal jar. 
> Note that with Spark 2.3.3 the error is different and this still does not 
> work but with a different error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27610) Yarn external shuffle service fails to start when spark.shuffle.io.mode=EPOLL

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27610:


Assignee: Apache Spark

> Yarn external shuffle service fails to start when spark.shuffle.io.mode=EPOLL
> -
>
> Key: SPARK-27610
> URL: https://issues.apache.org/jira/browse/SPARK-27610
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.4.2
>Reporter: Adrian Muraru
>Assignee: Apache Spark
>Priority: Minor
>
> Enabling netty epoll mode in yarn shuffle service 
> ({{spark.shuffle.io.mode=EPOLL}}) makes the Yarn NodeManager to abort.
>  Checking the stracktrace, it seems that while the io.netty package is 
> shaded, the native libraries provided by netty-all are not:
>   
> {noformat}
> Caused by: java.io.FileNotFoundException: 
> META-INF/native/liborg_spark_project_netty_transport_native_epoll_x86_64.so{noformat}
> *Full stack trace:*
> {noformat}
> 2019-04-24 23:14:46,372 ERROR [main] nodemanager.NodeManager 
> (NodeManager.java:initAndStartNodeManager(639)) - Error starting NodeManager
> java.lang.UnsatisfiedLinkError: failed to load the required native library
> at 
> org.spark_project.io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoop.(EpollEventLoop.java:55)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:134)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:35)
> at 
> org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:84)
> at 
> org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:58)
> at 
> org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:47)
> at 
> org.spark_project.io.netty.channel.MultithreadEventLoopGroup.(MultithreadEventLoopGroup.java:59)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:104)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:91)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:68)
> at 
> org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:52)
> at 
> org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
> at 
> org.apache.spark.network.server.TransportServer.(TransportServer.java:75)
> at 
> org.apache.spark.network.TransportContext.createServer(TransportContext.java:108)
> at 
> org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:186)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.serviceInit(AuxServices.java:147)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:268)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
> at 
> org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:357)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:636)
> at 
> org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:684)
> Caused by: java.lang.UnsatisfiedLinkError: could not load a native library: 
> org_spark_project_netty_transport_native_epoll_x86_64
> at 
> org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
> at 
> org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:207)
> at 
> org.spark_project.io.netty.channel.epoll.Native.(Native.java:65)
> at org.spark_project.io.netty.channel.epoll.Epoll.(Epoll.java:33)
> ... 26 more
> Suppressed: java.lang.UnsatisfiedLinkError: could not load a native 
> library: org_spark_project_netty_transport_native_epoll
> at 
> org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
> at 
> org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:210)
> 

[jira] [Assigned] (SPARK-27610) Yarn external shuffle service fails to start when spark.shuffle.io.mode=EPOLL

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27610:


Assignee: (was: Apache Spark)

> Yarn external shuffle service fails to start when spark.shuffle.io.mode=EPOLL
> -
>
> Key: SPARK-27610
> URL: https://issues.apache.org/jira/browse/SPARK-27610
> Project: Spark
>  Issue Type: Improvement
>  Components: Shuffle
>Affects Versions: 2.4.2
>Reporter: Adrian Muraru
>Priority: Minor
>
> Enabling netty epoll mode in yarn shuffle service 
> ({{spark.shuffle.io.mode=EPOLL}}) makes the Yarn NodeManager to abort.
>  Checking the stracktrace, it seems that while the io.netty package is 
> shaded, the native libraries provided by netty-all are not:
>   
> {noformat}
> Caused by: java.io.FileNotFoundException: 
> META-INF/native/liborg_spark_project_netty_transport_native_epoll_x86_64.so{noformat}
> *Full stack trace:*
> {noformat}
> 2019-04-24 23:14:46,372 ERROR [main] nodemanager.NodeManager 
> (NodeManager.java:initAndStartNodeManager(639)) - Error starting NodeManager
> java.lang.UnsatisfiedLinkError: failed to load the required native library
> at 
> org.spark_project.io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoop.(EpollEventLoop.java:55)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:134)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:35)
> at 
> org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:84)
> at 
> org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:58)
> at 
> org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:47)
> at 
> org.spark_project.io.netty.channel.MultithreadEventLoopGroup.(MultithreadEventLoopGroup.java:59)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:104)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:91)
> at 
> org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:68)
> at 
> org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:52)
> at 
> org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
> at 
> org.apache.spark.network.server.TransportServer.(TransportServer.java:75)
> at 
> org.apache.spark.network.TransportContext.createServer(TransportContext.java:108)
> at 
> org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:186)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.serviceInit(AuxServices.java:147)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:268)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
> at 
> org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:357)
> at 
> org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
> at 
> org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:636)
> at 
> org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:684)
> Caused by: java.lang.UnsatisfiedLinkError: could not load a native library: 
> org_spark_project_netty_transport_native_epoll_x86_64
> at 
> org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
> at 
> org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:207)
> at 
> org.spark_project.io.netty.channel.epoll.Native.(Native.java:65)
> at org.spark_project.io.netty.channel.epoll.Epoll.(Epoll.java:33)
> ... 26 more
> Suppressed: java.lang.UnsatisfiedLinkError: could not load a native 
> library: org_spark_project_netty_transport_native_epoll
> at 
> org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
> at 
> org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:210)
> ... 28 more
>

[jira] [Updated] (SPARK-27610) Yarn external shuffle service fails to start when spark.shuffle.io.mode=EPOLL

2019-04-30 Thread Adrian Muraru (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adrian Muraru updated SPARK-27610:
--
Description: 
Enabling netty epoll mode in yarn shuffle service 
({{spark.shuffle.io.mode=EPOLL}}) makes the Yarn NodeManager to abort.
 Checking the stracktrace, it seems that while the io.netty package is shaded, 
the native libraries provided by netty-all are not:
  
{noformat}
Caused by: java.io.FileNotFoundException: 
META-INF/native/liborg_spark_project_netty_transport_native_epoll_x86_64.so{noformat}
*Full stack trace:*
{noformat}
2019-04-24 23:14:46,372 ERROR [main] nodemanager.NodeManager 
(NodeManager.java:initAndStartNodeManager(639)) - Error starting NodeManager
java.lang.UnsatisfiedLinkError: failed to load the required native library
at 
org.spark_project.io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoop.(EpollEventLoop.java:55)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:134)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:35)
at 
org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:84)
at 
org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:58)
at 
org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:47)
at 
org.spark_project.io.netty.channel.MultithreadEventLoopGroup.(MultithreadEventLoopGroup.java:59)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:104)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:91)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:68)
at 
org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:52)
at 
org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
at 
org.apache.spark.network.server.TransportServer.(TransportServer.java:75)
at 
org.apache.spark.network.TransportContext.createServer(TransportContext.java:108)
at 
org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:186)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.serviceInit(AuxServices.java:147)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:268)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:357)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:636)
at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:684)
Caused by: java.lang.UnsatisfiedLinkError: could not load a native library: 
org_spark_project_netty_transport_native_epoll_x86_64
at 
org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
at 
org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:207)
at org.spark_project.io.netty.channel.epoll.Native.(Native.java:65)
at org.spark_project.io.netty.channel.epoll.Epoll.(Epoll.java:33)
... 26 more
Suppressed: java.lang.UnsatisfiedLinkError: could not load a native 
library: org_spark_project_netty_transport_native_epoll
at 
org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
at 
org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:210)
... 28 more
Caused by: java.io.FileNotFoundException: 
META-INF/native/liborg_spark_project_netty_transport_native_epoll.so
at 
org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:161)
... 29 more
Suppressed: java.lang.UnsatisfiedLinkError: no 
org_spark_project_netty_transport_native_epoll in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at 

[jira] [Created] (SPARK-27610) Yarn external shuffle service fails to start when spark.shuffle.io.mode=EPOLL

2019-04-30 Thread Adrian Muraru (JIRA)
Adrian Muraru created SPARK-27610:
-

 Summary: Yarn external shuffle service fails to start when 
spark.shuffle.io.mode=EPOLL
 Key: SPARK-27610
 URL: https://issues.apache.org/jira/browse/SPARK-27610
 Project: Spark
  Issue Type: Improvement
  Components: Shuffle
Affects Versions: 2.4.2
Reporter: Adrian Muraru


Enabling netty epoll mode in yarn shuffle service 
({{spark.shuffle.io.mode=EPOLL}}) makes the NN to abort.
 Checking the stracktrace, it seems that while the io.netty package is shaded, 
the native libraries provided by netty-all are not:
 
{noformat}
Caused by: java.io.FileNotFoundException: 
META-INF/native/liborg_spark_project_netty_transport_native_epoll_x86_64.so{noformat}

*Full stack trace:*
{noformat}
2019-04-24 23:14:46,372 ERROR [main] nodemanager.NodeManager 
(NodeManager.java:initAndStartNodeManager(639)) - Error starting NodeManager
java.lang.UnsatisfiedLinkError: failed to load the required native library
at 
org.spark_project.io.netty.channel.epoll.Epoll.ensureAvailability(Epoll.java:81)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoop.(EpollEventLoop.java:55)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:134)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.newChild(EpollEventLoopGroup.java:35)
at 
org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:84)
at 
org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:58)
at 
org.spark_project.io.netty.util.concurrent.MultithreadEventExecutorGroup.(MultithreadEventExecutorGroup.java:47)
at 
org.spark_project.io.netty.channel.MultithreadEventLoopGroup.(MultithreadEventLoopGroup.java:59)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:104)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:91)
at 
org.spark_project.io.netty.channel.epoll.EpollEventLoopGroup.(EpollEventLoopGroup.java:68)
at 
org.apache.spark.network.util.NettyUtils.createEventLoop(NettyUtils.java:52)
at 
org.apache.spark.network.server.TransportServer.init(TransportServer.java:95)
at 
org.apache.spark.network.server.TransportServer.(TransportServer.java:75)
at 
org.apache.spark.network.TransportContext.createServer(TransportContext.java:108)
at 
org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:186)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.serviceInit(AuxServices.java:147)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:268)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:357)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:636)
at 
org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:684)
Caused by: java.lang.UnsatisfiedLinkError: could not load a native library: 
org_spark_project_netty_transport_native_epoll_x86_64
at 
org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
at 
org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:207)
at org.spark_project.io.netty.channel.epoll.Native.(Native.java:65)
at org.spark_project.io.netty.channel.epoll.Epoll.(Epoll.java:33)
... 26 more
Suppressed: java.lang.UnsatisfiedLinkError: could not load a native 
library: org_spark_project_netty_transport_native_epoll
at 
org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:205)
at 
org.spark_project.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:210)
... 28 more
Caused by: java.io.FileNotFoundException: 
META-INF/native/liborg_spark_project_netty_transport_native_epoll.so
at 
org.spark_project.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:161)
... 29 more
Suppressed: java.lang.UnsatisfiedLinkError: no 
org_spark_project_netty_transport_native_epoll in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at 

[jira] [Assigned] (SPARK-27608) Upgrade Surefire plugin to 3.0.0-M3

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27608:


Assignee: Apache Spark

> Upgrade Surefire plugin to 3.0.0-M3
> ---
>
> Key: SPARK-27608
> URL: https://issues.apache.org/jira/browse/SPARK-27608
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Assignee: Apache Spark
>Priority: Minor
>
> This issue aims to upgrade Surefire to bring SUREFIRE-1613 .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27608) Upgrade Surefire plugin to 3.0.0-M3

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27608:


Assignee: (was: Apache Spark)

> Upgrade Surefire plugin to 3.0.0-M3
> ---
>
> Key: SPARK-27608
> URL: https://issues.apache.org/jira/browse/SPARK-27608
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Priority: Minor
>
> This issue aims to upgrade Surefire to bring SUREFIRE-1613 .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27609) [Documentation Issue?] from_json expects values of options dictionary to be

2019-04-30 Thread Zachary Jablons (JIRA)
Zachary Jablons created SPARK-27609:
---

 Summary: [Documentation Issue?] from_json expects values of 
options dictionary to be 
 Key: SPARK-27609
 URL: https://issues.apache.org/jira/browse/SPARK-27609
 Project: Spark
  Issue Type: Bug
  Components: PySpark
Affects Versions: 2.2.1
 Environment: I've found this issue on an AWS Glue development endpoint 
which is running Spark 2.2.1 and being given jobs through a SparkMagic Python 2 
kernel, running through livy and all that. I don't know how much of that is 
important for reproduction, and can get more details if needed. 
Reporter: Zachary Jablons


When reading a column of a DataFrame that consists of serialized JSON, one of 
the options for inferring the schema and then parsing the JSON is to do a two 
step process consisting of:

 

{{#this results in a new dataframe where the top-level keys of the JSON # are 
columns}}

{{df_parsed_direct = spark.read.json(df.rdd.map(lambda row: row.json_col))}}

{{# this does that while preserving the rest of df}}
 {{schema = df_parsed_direct.schema}}
 {{df_parsed = df.withColumn('parsed', from_json(df.json_col, schema)}}

When I do this, I sometimes find myself passing in options. My understanding 
is, from the documentation 
[here|http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.functions.from_json],
 that the nature of these options passed should be the same whether I do

{{spark.read.option('option',value)}}

or

{{from_json(df.json_col, schema, options=\{'option':value})}}

 

However, I've found that the latter expects value to be a string representation 
of the value that can be decoded by JSON. So, for example 
options=\{'multiLine':True} fails with 

{{java.lang.ClassCastException: java.lang.Boolean cannot be cast to 
java.lang.String}}

whereas options=\{'multiLine':'true'} works just fine. 

Notably, providing spark.read.option('multiLine',True) works fine!

 

The code for reproducing this issue as well as the stacktrace from hitting it 
are provided in [this 
gist|https://gist.github.com/zmjjmz/0af5cf9b059b4969951e825565e266aa]. 

 

I also noticed that from_json doesn't complain if you give it a garbage option 
key – but that seems separate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27608) Upgrade Surefire plugin to 3.0.0-M3

2019-04-30 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27608:
--
Component/s: (was: Tests)
 Build

> Upgrade Surefire plugin to 3.0.0-M3
> ---
>
> Key: SPARK-27608
> URL: https://issues.apache.org/jira/browse/SPARK-27608
> Project: Spark
>  Issue Type: Sub-task
>  Components: Build
>Affects Versions: 3.0.0
>Reporter: Dongjoon Hyun
>Priority: Minor
>
> This issue aims to upgrade Surefire to bring SUREFIRE-1613 .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27608) Upgrade Surefire plugin to 3.0.0-M3

2019-04-30 Thread Dongjoon Hyun (JIRA)
Dongjoon Hyun created SPARK-27608:
-

 Summary: Upgrade Surefire plugin to 3.0.0-M3
 Key: SPARK-27608
 URL: https://issues.apache.org/jira/browse/SPARK-27608
 Project: Spark
  Issue Type: Sub-task
  Components: Tests
Affects Versions: 3.0.0
Reporter: Dongjoon Hyun


This issue aims to upgrade Surefire to bring SUREFIRE-1613 .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24422) Add JDK11 in our Jenkins' build servers

2019-04-30 Thread Dongjoon Hyun (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830718#comment-16830718
 ] 

Dongjoon Hyun commented on SPARK-24422:
---

Hi, [~dbtsai] and [~shaneknapp].
Can we resolve this issue for now because we keep tracking the 
`spark-master-test-maven-hadoop-2.7-jdk-11-ubuntu-testing` job?

> Add JDK11 in our Jenkins' build servers
> ---
>
> Key: SPARK-24422
> URL: https://issues.apache.org/jira/browse/SPARK-24422
> Project: Spark
>  Issue Type: Sub-task
>  Components: Project Infra
>Affects Versions: 2.3.0
>Reporter: DB Tsai
>Assignee: shane knapp
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27607) Improve performance of Row.toString()

2019-04-30 Thread Josh Rosen (JIRA)
Josh Rosen created SPARK-27607:
--

 Summary: Improve performance of Row.toString()
 Key: SPARK-27607
 URL: https://issues.apache.org/jira/browse/SPARK-27607
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Josh Rosen


I have a job which ends up calling {{org.apache.spark.sql.Row.toString}} on 
every row in a massive dataset (the reasons for this are slightly odd and it's 
a bit non-trivial to change the job to avoid this step). 

{{Row.toString}} is implemented by first constructing a WrappedArray containing 
the Row's values (by calling {{toSeq}}) and then turning that array into a 
string with {{mkString}}. We might be able to get a small performance win by 
pipelining these steps, using an imperative loop to append fields to a 
StringBuilder as soon as they're retrieved (thereby cutting out a few layers of 
Scala collections indirection).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27548) PySpark toLocalIterator does not raise errors from worker

2019-04-30 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830694#comment-16830694
 ] 

Apache Spark commented on SPARK-27548:
--

User 'BryanCutler' has created a pull request for this issue:
https://github.com/apache/spark/pull/24070

> PySpark toLocalIterator does not raise errors from worker
> -
>
> Key: SPARK-27548
> URL: https://issues.apache.org/jira/browse/SPARK-27548
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.1
>Reporter: Bryan Cutler
>Priority: Major
>
> When using a PySpark RDD local iterator and an error occurs on the worker, it 
> is not picked up by Py4J and raised in the Python driver process. So unless 
> looking at logs, there is no way for the application to know the worker had 
> an error. This is a test that should pass if the error is raised in the 
> driver:
> {code}
> def test_to_local_iterator_error(self):
> def fail(_):
> raise RuntimeError("local iterator error")
> rdd = self.sc.parallelize(range(10)).map(fail)
> with self.assertRaisesRegexp(Exception, "local iterator error"):
> for _ in rdd.toLocalIterator():
> pass{code}
> but it does not raise an exception:
> {noformat}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 428, in main
>     process()
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 423, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 505, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/util.py", line 
> 99, in wrapper
>     return f(*args, **kwargs)
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 742, in 
> fail
>     raise RuntimeError("local iterator error")
> RuntimeError: local iterator error
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:453)
> ...
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> FAIL
> ==
> FAIL: test_to_local_iterator_error (pyspark.tests.test_rdd.RDDTests)
> --
> Traceback (most recent call last):
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 748, in 
> test_to_local_iterator_error
>     pass
> AssertionError: Exception not raised{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27548) PySpark toLocalIterator does not raise errors from worker

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27548:


Assignee: (was: Apache Spark)

> PySpark toLocalIterator does not raise errors from worker
> -
>
> Key: SPARK-27548
> URL: https://issues.apache.org/jira/browse/SPARK-27548
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.1
>Reporter: Bryan Cutler
>Priority: Major
>
> When using a PySpark RDD local iterator and an error occurs on the worker, it 
> is not picked up by Py4J and raised in the Python driver process. So unless 
> looking at logs, there is no way for the application to know the worker had 
> an error. This is a test that should pass if the error is raised in the 
> driver:
> {code}
> def test_to_local_iterator_error(self):
> def fail(_):
> raise RuntimeError("local iterator error")
> rdd = self.sc.parallelize(range(10)).map(fail)
> with self.assertRaisesRegexp(Exception, "local iterator error"):
> for _ in rdd.toLocalIterator():
> pass{code}
> but it does not raise an exception:
> {noformat}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 428, in main
>     process()
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 423, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 505, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/util.py", line 
> 99, in wrapper
>     return f(*args, **kwargs)
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 742, in 
> fail
>     raise RuntimeError("local iterator error")
> RuntimeError: local iterator error
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:453)
> ...
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> FAIL
> ==
> FAIL: test_to_local_iterator_error (pyspark.tests.test_rdd.RDDTests)
> --
> Traceback (most recent call last):
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 748, in 
> test_to_local_iterator_error
>     pass
> AssertionError: Exception not raised{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27548) PySpark toLocalIterator does not raise errors from worker

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27548:


Assignee: Apache Spark

> PySpark toLocalIterator does not raise errors from worker
> -
>
> Key: SPARK-27548
> URL: https://issues.apache.org/jira/browse/SPARK-27548
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 2.4.1
>Reporter: Bryan Cutler
>Assignee: Apache Spark
>Priority: Major
>
> When using a PySpark RDD local iterator and an error occurs on the worker, it 
> is not picked up by Py4J and raised in the Python driver process. So unless 
> looking at logs, there is no way for the application to know the worker had 
> an error. This is a test that should pass if the error is raised in the 
> driver:
> {code}
> def test_to_local_iterator_error(self):
> def fail(_):
> raise RuntimeError("local iterator error")
> rdd = self.sc.parallelize(range(10)).map(fail)
> with self.assertRaisesRegexp(Exception, "local iterator error"):
> for _ in rdd.toLocalIterator():
> pass{code}
> but it does not raise an exception:
> {noformat}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last):
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 428, in main
>     process()
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
> 423, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 505, in dump_stream
>     vs = list(itertools.islice(iterator, batch))
>   File "/home/bryan/git/spark/python/lib/pyspark.zip/pyspark/util.py", line 
> 99, in wrapper
>     return f(*args, **kwargs)
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 742, in 
> fail
>     raise RuntimeError("local iterator error")
> RuntimeError: local iterator error
>     at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:453)
> ...
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> FAIL
> ==
> FAIL: test_to_local_iterator_error (pyspark.tests.test_rdd.RDDTests)
> --
> Traceback (most recent call last):
>   File "/home/bryan/git/spark/python/pyspark/tests/test_rdd.py", line 748, in 
> test_to_local_iterator_error
>     pass
> AssertionError: Exception not raised{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24601) Bump Jackson version to 2.9.6

2019-04-30 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-24601:
--
Fix Version/s: (was: 2.4.3)

> Bump Jackson version to 2.9.6
> -
>
> Key: SPARK-24601
> URL: https://issues.apache.org/jira/browse/SPARK-24601
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: release-notes
> Fix For: 3.0.0
>
>
> The Jackson version is lacking behind, and therefore I have to add a lot of 
> exclusions to the SBT files: 
> ```
> Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible 
> Jackson version: 2.9.5
>   at 
> com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
>   at 
> com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
>   at 
> com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:751)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala:82)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.(RDDOperationScope.scala)
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27051) Bump Jackson version to 2.9.8

2019-04-30 Thread Dongjoon Hyun (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-27051:
--
Fix Version/s: (was: 2.4.3)

> Bump Jackson version to 2.9.8
> -
>
> Key: SPARK-27051
> URL: https://issues.apache.org/jira/browse/SPARK-27051
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: Yanbo Liang
>Assignee: Yanbo Liang
>Priority: Major
> Fix For: 3.0.0
>
>
> Fasterxml Jackson version before 2.9.8 is affected by multiple CVEs 
> [[https://github.com/FasterXML/jackson-databind/issues/2186]], we need to fix 
> bump the dependent Jackson to 2.9.8.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-27593) CSV Parser returns 2 DataFrame - Valid and Malformed DFs

2019-04-30 Thread Ladislav Jech (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830596#comment-16830596
 ] 

Ladislav Jech edited comment on SPARK-27593 at 4/30/19 7:12 PM:


[~hyukjin.kwon] - Then return optionally just an array of line numbers which 
are malformed. Such array is can be logged alongside with log of processed 
file. Otherwise one must do load DF with permissive mode and load another DF 
with malformed and do at least number of malformed row count. Still its just 
count, but in malformed mode you have the line index available, so why just 
don't expose it as optional variable.

It can return into variable passed, like:

List malformedRecords = new ArrayList<>();

sqlContext.read.format("CSV").option("mode","malformed").option("malformedRecords",
 malformedRecords).load("S3://...")

And object malformedRecords will be updated with row index... doesn't need to 
be another DF for such purpose. Eventually it can return number of columns 
detected on specific line


was (Author: archenroot):
[~hyukjin.kwon] - Then return optionally just an array of line numbers which 
are malformed. Such array is can be logged alongside with log of processed 
file. Otherwise one must do load DF with permissive mode and load another DF 
with malformed and do at least number of malformed row count. Still its just 
count, but in malformed mode you have the line index available, so why just 
don't expose it as optional variable.

It can return into variable passed, like:

List malformedRecords = new ArrayList<>();

sqlContext.read.format("CSV").option("mode","malformed").option("malformedRecords",
 malformedRecords).load("S3://...")

And object malformedRecords will be updated with row index...

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> 
>
> Key: SPARK-27593
> URL: https://issues.apache.org/jira/browse/SPARK-27593
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.2
>Reporter: Ladislav Jech
>Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-27593) CSV Parser returns 2 DataFrame - Valid and Malformed DFs

2019-04-30 Thread Ladislav Jech (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ladislav Jech reopened SPARK-27593:
---

As per comment, please review again.

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> 
>
> Key: SPARK-27593
> URL: https://issues.apache.org/jira/browse/SPARK-27593
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.2
>Reporter: Ladislav Jech
>Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27593) CSV Parser returns 2 DataFrame - Valid and Malformed DFs

2019-04-30 Thread Ladislav Jech (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830596#comment-16830596
 ] 

Ladislav Jech commented on SPARK-27593:
---

[~hyukjin.kwon] - Then return optionally just an array of line numbers which 
are malformed. Such array is can be logged alongside with log of processed 
file. Otherwise one must do load DF with permissive mode and load another DF 
with malformed and do at least number of malformed row count. Still its just 
count, but in malformed mode you have the line index available, so why just 
don't expose it as optional variable.

It can return into variable passed, like:

List malformedRecords = new ArrayList<>();

sqlContext.read.format("CSV").option("mode","malformed").option("malformedRecords",
 malformedRecords).load("S3://...")

And object malformedRecords will be updated with row index...

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> 
>
> Key: SPARK-27593
> URL: https://issues.apache.org/jira/browse/SPARK-27593
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.2
>Reporter: Ladislav Jech
>Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27597) RuntimeConfig should be serializable

2019-04-30 Thread Nick Dimiduk (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830564#comment-16830564
 ] 

Nick Dimiduk commented on SPARK-27597:
--

{quote}bq. Do you want to access {{SparkSession}} in UDF?
{quote}
Nope. In my case, all I want is the configuration.

> RuntimeConfig should be serializable
> 
>
> Key: SPARK-27597
> URL: https://issues.apache.org/jira/browse/SPARK-27597
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Major
>
> When implementing a UDF or similar, it's quite surprising to see that the 
> {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When 
> modeling UDFs in an object-oriented way, this leads to quite a surprise, an 
> ugly NPE from the {{call}} site.
> {noformat}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
> at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170)
> at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170)
> ...{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-04-30 Thread Bryan Cutler (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830535#comment-16830535
 ] 

Bryan Cutler commented on SPARK-27463:
--

I left some comments on the doc. Overall, I think it sounds like a useful 
addition and won't require a huge amount of changes. Since there are some 
different ways we could go with the PySpark APIs, just make sure each choice is 
well described with examples. Thanks!

> SPIP: Support Dataframe Cogroup via Pandas UDFs 
> 
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>  Labels: SPIP
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27566) SIGSEV in Spark SQL during broadcast

2019-04-30 Thread Martin Studer (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830530#comment-16830530
 ] 

Martin Studer commented on SPARK-27566:
---

Hi [~hyukjin.kwon], I try to collect more information but this is unfortunately 
not so straightforward. The issue occurs as part of a bigger pyspark 
application whose code I can't share. I try to pin down the issue as good as I 
can.

> SIGSEV in Spark SQL during broadcast
> 
>
> Key: SPARK-27566
> URL: https://issues.apache.org/jira/browse/SPARK-27566
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: Hortonworks HDP 2.6.5, Spark 2.3.0.2.6.5.1050-37
>Reporter: Martin Studer
>Priority: Major
>
> During execution of a broadcast exchange the JVM aborts with a segmentation 
> fault:
> {noformat}
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
> 1.8.0_102-b14)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
> {noformat}
> The corresponding information from the {{hs_err_pid}} is:
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
> 1.8.0_102-b14)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> #
> ---  T H R E A D  ---
> Current thread (0x7feaf8c54000):  JavaThread "broadcast-exchange-2" 
> daemon [_thread_in_Java, id=30475, 
> stack(0x7feabf0ca000,0x7feabf1cb000)]
> siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 
> 0x003c7fc8
> Registers:
> RAX=0x0007c0011570, RBX=0x003c7f90, RCX=0x0038, 
> RDX=0x000775daf6d0
> RSP=0x7feabf1c8ab0, RBP=0x7feabf1c8af0, RSI=0x7feb0830, 
> RDI=0x0001
> R8 =0x7feb0800b280, R9 =0x7feb0800c6a0, R10=0x7feb73d59100, 
> R11=0x7feb73181700
> R12=0x, R13=0x7feb5c5a5951, R14=0x7feabf1c8b00, 
> R15=0x7feaf8c54000
> RIP=0x7feb5d024ea2, EFLAGS=0x00010283, CSGSFS=0x0033, 
> ERR=0x0004
>   TRAPNO=0x000e
> Top of Stack: (sp=0x7feabf1c8ab0)
> 0x7feabf1c8ab0:   7feabf1c8ab0 7feb5c5a5951
> 0x7feabf1c8ac0:   7feabf1c8b00 7feb5c5a9610
> 0x7feabf1c8ad0:   7feb4e626068 7feb5c5a5970
> 0x7feabf1c8ae0:    7feabf1c8b00
> 0x7feabf1c8af0:   7feabf1c8b68 7feb5d007dd0
> 0x7feabf1c8b00:    7feb5d007dd0
> 0x7feabf1c8b10:   000775daf6d0 
> 0x7feabf1c8b20:   000774fd2048 7feabf1c8b18
> 0x7feabf1c8b30:   7feb4f27548f 7feabf1c8c20
> 0x7feabf1c8b40:   7feb4f275cd0 
> 0x7feabf1c8b50:   7feb4f2755f0 7feabf1c8b10
> 0x7feabf1c8b60:   7feabf1c8bf0 7feabf1c8c78
> 0x7feabf1c8b70:   7feb5d007dd0 
> 0x7feabf1c8b80:    
> 0x7feabf1c8b90:    
> 0x7feabf1c8ba0:    
> 0x7feabf1c8bb0:    
> 0x7feabf1c8bc0:    
> 0x7feabf1c8bd0:    0001
> 0x7feabf1c8be0:    000774fd2048
> 0x7feabf1c8bf0:   000775daf6f8 000775daf6e8
> 0x7feabf1c8c00:    7feb5d008040
> 0x7feabf1c8c10:   0020 7feb5d008040
> 0x7feabf1c8c20:   000774fdb080 0001
> 0x7feabf1c8c30:   000774fd2048 7feabf1c8c28
> 0x7feabf1c8c40:   7feb4f7636a3 7feabf1c8cc8
> 0x7feabf1c8c50:   7feabfb37848 
> 0x7feabf1c8c60:   7feb4f763720 7feabf1c8bf0
> 0x7feabf1c8c70:   7feabf1c8ca0 7feabf1c8d20
> 0x7feabf1c8c80:   7feb5d007dd0 
> 0x7feabf1c8c90:    0006c03f26e8
> 0x7feabf1c8ca0:   0006c03f26e8  

[jira] [Commented] (SPARK-17859) persist should not impede with spark's ability to perform a broadcast join.

2019-04-30 Thread colin fang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-17859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830504#comment-16830504
 ] 

colin fang commented on SPARK-17859:


The above case works for me in v2.4
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
df_large = spark.range(1e6)
df_small = F.broadcast(spark.range(10).coalesce(1)).cache()
df_large.join(df_small, "id").explain()


== Physical Plan ==
*(2) Project [id#0L]
+- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
   :- *(2) Range (0, 100, step=1, splits=4)
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
false]))
  +- *(1) InMemoryTableScan [id#2L]
+- InMemoryRelation [id#2L], StorageLevel(disk, memory, 
deserialized, 1 replicas)
  +- Coalesce 1
 +- *(1) Range (0, 10, step=1, splits=4)
{code}
However, I have definitely seen cases where `F.broadcast` is ignored for cached 
dataframe. (I am unable to find a minimal example though.)

> persist should not impede with spark's ability to perform a broadcast join.
> ---
>
> Key: SPARK-17859
> URL: https://issues.apache.org/jira/browse/SPARK-17859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.0.0
> Environment: spark 2.0.0 , Linux RedHat
>Reporter: Franck Tago
>Priority: Major
>
> I am using Spark 2.0.0 
> My investigation leads me to conclude that calling persist could prevent 
> broadcast join  from happening .
> Example
> Case1: No persist call 
> var  df1 =spark.range(100).select($"id".as("id1"))
> df1: org.apache.spark.sql.DataFrame = [id1: bigint]
>  var df2 =spark.range(1000).select($"id".as("id2"))
> df2: org.apache.spark.sql.DataFrame = [id2: bigint]
>  df1.join(df2 , $"id1" === $"id2" ).explain 
> == Physical Plan ==
> *BroadcastHashJoin [id1#117L], [id2#123L], Inner, BuildRight
> :- *Project [id#114L AS id1#117L]
> :  +- *Range (0, 100, splits=2)
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, 
> false]))
>+- *Project [id#120L AS id2#123L]
>   +- *Range (0, 1000, splits=2)
> Case 2:  persist call 
>  df1.persist.join(df2 , $"id1" === $"id2" ).explain 
> 16/10/10 15:50:21 WARN CacheManager: Asked to cache already cached data.
> == Physical Plan ==
> *SortMergeJoin [id1#3L], [id2#9L], Inner
> :- *Sort [id1#3L ASC], false, 0
> :  +- Exchange hashpartitioning(id1#3L, 10)
> : +- InMemoryTableScan [id1#3L]
> ::  +- InMemoryRelation [id1#3L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
> :: :  +- *Project [id#0L AS id1#3L]
> :: : +- *Range (0, 100, splits=2)
> +- *Sort [id2#9L ASC], false, 0
>+- Exchange hashpartitioning(id2#9L, 10)
>   +- InMemoryTableScan [id2#9L]
>  :  +- InMemoryRelation [id2#9L], true, 1, StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>  : :  +- *Project [id#6L AS id2#9L]
>  : : +- *Range (0, 1000, splits=2)
> Why does the persist call prevent the broadcast join . 
> My opinion is that it should not .
> I was made aware that the persist call is  lazy and that might have something 
> to do with it , but I still contend that it should not . 
> Losing broadcast joins is really costly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27566) SIGSEV in Spark SQL during broadcast

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27566.
--
Resolution: Incomplete

No feedback

> SIGSEV in Spark SQL during broadcast
> 
>
> Key: SPARK-27566
> URL: https://issues.apache.org/jira/browse/SPARK-27566
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
> Environment: Hortonworks HDP 2.6.5, Spark 2.3.0.2.6.5.1050-37
>Reporter: Martin Studer
>Priority: Major
>
> During execution of a broadcast exchange the JVM aborts with a segmentation 
> fault:
> {noformat}
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
> 1.8.0_102-b14)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
> {noformat}
> The corresponding information from the {{hs_err_pid}} is:
> {noformat}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7feb5d024ea2, pid=26118, tid=0x7feabf1ca700
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_102-b14) (build 
> 1.8.0_102-b14)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.102-b14 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # j  scala.runtime.BoxesRunTime.unboxToLong(Ljava/lang/Object;)J+9
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> #
> ---  T H R E A D  ---
> Current thread (0x7feaf8c54000):  JavaThread "broadcast-exchange-2" 
> daemon [_thread_in_Java, id=30475, 
> stack(0x7feabf0ca000,0x7feabf1cb000)]
> siginfo: si_signo: 11 (SIGSEGV), si_code: 1 (SEGV_MAPERR), si_addr: 
> 0x003c7fc8
> Registers:
> RAX=0x0007c0011570, RBX=0x003c7f90, RCX=0x0038, 
> RDX=0x000775daf6d0
> RSP=0x7feabf1c8ab0, RBP=0x7feabf1c8af0, RSI=0x7feb0830, 
> RDI=0x0001
> R8 =0x7feb0800b280, R9 =0x7feb0800c6a0, R10=0x7feb73d59100, 
> R11=0x7feb73181700
> R12=0x, R13=0x7feb5c5a5951, R14=0x7feabf1c8b00, 
> R15=0x7feaf8c54000
> RIP=0x7feb5d024ea2, EFLAGS=0x00010283, CSGSFS=0x0033, 
> ERR=0x0004
>   TRAPNO=0x000e
> Top of Stack: (sp=0x7feabf1c8ab0)
> 0x7feabf1c8ab0:   7feabf1c8ab0 7feb5c5a5951
> 0x7feabf1c8ac0:   7feabf1c8b00 7feb5c5a9610
> 0x7feabf1c8ad0:   7feb4e626068 7feb5c5a5970
> 0x7feabf1c8ae0:    7feabf1c8b00
> 0x7feabf1c8af0:   7feabf1c8b68 7feb5d007dd0
> 0x7feabf1c8b00:    7feb5d007dd0
> 0x7feabf1c8b10:   000775daf6d0 
> 0x7feabf1c8b20:   000774fd2048 7feabf1c8b18
> 0x7feabf1c8b30:   7feb4f27548f 7feabf1c8c20
> 0x7feabf1c8b40:   7feb4f275cd0 
> 0x7feabf1c8b50:   7feb4f2755f0 7feabf1c8b10
> 0x7feabf1c8b60:   7feabf1c8bf0 7feabf1c8c78
> 0x7feabf1c8b70:   7feb5d007dd0 
> 0x7feabf1c8b80:    
> 0x7feabf1c8b90:    
> 0x7feabf1c8ba0:    
> 0x7feabf1c8bb0:    
> 0x7feabf1c8bc0:    
> 0x7feabf1c8bd0:    0001
> 0x7feabf1c8be0:    000774fd2048
> 0x7feabf1c8bf0:   000775daf6f8 000775daf6e8
> 0x7feabf1c8c00:    7feb5d008040
> 0x7feabf1c8c10:   0020 7feb5d008040
> 0x7feabf1c8c20:   000774fdb080 0001
> 0x7feabf1c8c30:   000774fd2048 7feabf1c8c28
> 0x7feabf1c8c40:   7feb4f7636a3 7feabf1c8cc8
> 0x7feabf1c8c50:   7feabfb37848 
> 0x7feabf1c8c60:   7feb4f763720 7feabf1c8bf0
> 0x7feabf1c8c70:   7feabf1c8ca0 7feabf1c8d20
> 0x7feabf1c8c80:   7feb5d007dd0 
> 0x7feabf1c8c90:    0006c03f26e8
> 0x7feabf1c8ca0:   0006c03f26e8  
> Instructions: (pc=0x7feb5d024ea2)
> 0x7feb5d024e82:   89 59 10 bf 01 00 00 00 48 89 79 18 48 83 c1 30
> 0x7feb5d024e92:   48 89 4d e0 48 3b d8 0f 84 5b 00 00 00 8b 48 0c
> 0x7feb5d024ea2:   48 3b 04 0b 0f 84 4e 00 00 00 

[jira] [Resolved] (SPARK-27574) spark on kubernetes driver pod phase changed from running to pending and starts another container in pod

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27574.
--
Resolution: Invalid

If it's asking for help to diagnose symptoms, ask it to mailing list before 
filing it as an issue. you will get a better answer. 

> spark on kubernetes driver pod phase changed from running to pending and 
> starts another container in pod
> 
>
> Key: SPARK-27574
> URL: https://issues.apache.org/jira/browse/SPARK-27574
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.4.0
> Environment: Kubernetes version (use kubectl version):
> v1.10.0
> OS (e.g: cat /etc/os-release):
> CentOS-7
> Kernel (e.g. uname -a):
> 4.17.11-1.el7.elrepo.x86_64
> Spark-2.4.0
>Reporter: Will Zhang
>Priority: Major
> Attachments: driver-pod-logs.zip
>
>
> I'm using spark-on-kubernetes to submit spark app to kubernetes.
> most of the time, it runs smoothly.
> but sometimes, I see logs after submitting: the driver pod phase changed from 
> running to pending and starts another container in the pod though the first 
> container exited successfully.
> I use the standard spark-submit to kubernetes like:
> /opt/spark/spark-2.4.0-bin-hadoop2.7/bin/spark-submit --deploy-mode cluster 
> --class xxx ...
>  
> log is below:
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: 2019-04-19 09:38:40 INFO 
> LoggingPodStatusWatcherImpl:54 - State changed, new state:
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: pod name: 
> com--cloud-mf-trainer-submit-1555666719424-driver
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: namespace: default
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: labels: DagTask_ID -> 
> 54f854e2-0bce-4bd6-50e7-57b521b216f7, spark-app-selector -> 
> spark-4343fe80572c4240bd933246efd975da, spark-role -> driver
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: pod uid: 
> ea4410d5-6286-11e9-ae72-e8611f1fbb2a
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: creation time: 
> 2019-04-19T09:38:40Z
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: service account name: 
> default
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: volumes: 
> spark-local-dir-1, spark-conf-volume, default-token-q7drh
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: node name: N/A
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: start time: N/A
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: container images: N/A
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: phase: Pending
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: status: []
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: 2019-04-19 09:38:40 INFO 
> LoggingPodStatusWatcherImpl:54 - State changed, new state:
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: pod name: 
> com--cloud-mf-trainer-submit-1555666719424-driver
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: namespace: default
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: labels: DagTask_ID -> 
> 54f854e2-0bce-4bd6-50e7-57b521b216f7, spark-app-selector -> 
> spark-4343fe80572c4240bd933246efd975da, spark-role -> driver
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: pod uid: 
> ea4410d5-6286-11e9-ae72-e8611f1fbb2a
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: creation time: 
> 2019-04-19T09:38:40Z
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: service account name: 
> default
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: volumes: 
> spark-local-dir-1, spark-conf-volume, default-token-q7drh
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: node name: 
> yq01-m12-ai2b-service02.yq01..com
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: start time: N/A
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: container images: N/A
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: phase: Pending
> 19/04/19 09:38:40 INFO LineBufferedStream: stdout: status: []
> 19/04/19 09:38:41 INFO LineBufferedStream: stdout: 2019-04-19 09:38:41 INFO 
> LoggingPodStatusWatcherImpl:54 - State changed, new state:
> 19/04/19 09:38:41 INFO LineBufferedStream: stdout: pod name: 
> com--cloud-mf-trainer-submit-1555666719424-driver
> 19/04/19 09:38:41 INFO LineBufferedStream: stdout: namespace: default
> 19/04/19 09:38:41 INFO LineBufferedStream: stdout: labels: DagTask_ID -> 
> 54f854e2-0bce-4bd6-50e7-57b521b216f7, spark-app-selector -> 
> spark-4343fe80572c4240bd933246efd975da, spark-role -> driver
> 19/04/19 09:38:41 INFO LineBufferedStream: stdout: pod uid: 
> ea4410d5-6286-11e9-ae72-e8611f1fbb2a
> 19/04/19 09:38:41 INFO LineBufferedStream: stdout: creation time: 
> 2019-04-19T09:38:40Z
> 19/04/19 09:38:41 INFO LineBufferedStream: stdout: service account name: 
> default
> 19/04/19 

[jira] [Resolved] (SPARK-27582) Add Dataset DSL for left_anti and left_semi joins

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27582.
--
Resolution: Won't Fix

I don't think we should add a set of aliases. The way of them looks already 
super easy.

> Add Dataset DSL for left_anti and left_semi joins
> -
>
> Key: SPARK-27582
> URL: https://issues.apache.org/jira/browse/SPARK-27582
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.2
>Reporter: Stanislav Bytsko
>Priority: Major
>
> Currently we have
> {code:java}
> org.apache.spark.sql.Dataset[T]#joinWith[U](other: Dataset[U], condition: 
> Column, joinType: String): Dataset[(T, U)]
> {code}
> which explicitly excludes left_anti and left_semi joins, which is 
> understandable, because result would have different signature.
> I think it's easily fixed drawback, which accepts 2 solutions I can think of:
> - Extend current joinWith to return null for second (_2) item in the tuple. 
> Not ideal as no-one likes nulls, but workable, as client should be able to 
> handle that by doing {code}.map(_._1){code} immediately afterwards
> - Add 2 new methods 
> {code}org.apache.spark.sql.Dataset[T]#joinSemiWith[U](other: Dataset[U], 
> condition: Column): Dataset[T]{code} and 
> {code}org.apache.spark.sql.Dataset[T]#joinAntiWith[U](other: Dataset[U], 
> condition: Column): Dataset[T]{code} which is much nicer, but adds 2 methods 
> to the API. Method names could be semiJoinWith and antiJoinWith, which is 
> more logical, but not sorted properly in the list of 
> org.apache.spark.sql.Dataset methods



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27585) No such method error (sun.nio.ch.DirectBuffer.cleaner())

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27585.
--
Resolution: Duplicate

> No such method error (sun.nio.ch.DirectBuffer.cleaner())
> 
>
> Key: SPARK-27585
> URL: https://issues.apache.org/jira/browse/SPARK-27585
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 2.4.1, 2.4.2
>Reporter: Mohsen Taheri
>Priority: Major
>
> The error appears when a JDBC read executes and lasts for a while 
> (partitioned queries with null conditions are executing with more than normal 
> query timing and then it results in the error maybe)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27585) No such method error (sun.nio.ch.DirectBuffer.cleaner())

2019-04-30 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830464#comment-16830464
 ] 

Hyukjin Kwon commented on SPARK-27585:
--

It was not added. Please search JIRAs next time before opening btw.

> No such method error (sun.nio.ch.DirectBuffer.cleaner())
> 
>
> Key: SPARK-27585
> URL: https://issues.apache.org/jira/browse/SPARK-27585
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 2.4.1, 2.4.2
>Reporter: Mohsen Taheri
>Priority: Major
>
> The error appears when a JDBC read executes and lasts for a while 
> (partitioned queries with null conditions are executing with more than normal 
> query timing and then it results in the error maybe)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27585) No such method error (sun.nio.ch.DirectBuffer.cleaner())

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27585:
-
Issue Type: Sub-task  (was: Bug)
Parent: SPARK-24417

> No such method error (sun.nio.ch.DirectBuffer.cleaner())
> 
>
> Key: SPARK-27585
> URL: https://issues.apache.org/jira/browse/SPARK-27585
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 2.4.1, 2.4.2
>Reporter: Mohsen Taheri
>Priority: Major
>
> The error appears when a JDBC read executes and lasts for a while 
> (partitioned queries with null conditions are executing with more than normal 
> query timing and then it results in the error maybe)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27587) No such method error (sun.nio.ch.DirectBuffer.cleaner()) when reading big table from JDBC (with one slow query)

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27587.
--
Resolution: Duplicate

> No such method error (sun.nio.ch.DirectBuffer.cleaner()) when reading big 
> table from JDBC (with one slow query)
> ---
>
> Key: SPARK-27587
> URL: https://issues.apache.org/jira/browse/SPARK-27587
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core, SQL
>Affects Versions: 2.4.1, 2.4.2
>Reporter: Mohsen Taheri
>Priority: Major
>
> It throws the error while reading big tables from JDBC data source:
> > Code:
> sparkSession.read()
>  .option("numPartitions", data.numPartitions)
>  .option("partitionColumn", data.pk)
>  .option("lowerBound", data.min)
>  .option("upperBound", data.max)
>  .option("queryTimeout", 180).
>  format("jdbc").
>  jdbc(dbURL, tableName, props).
>  
> repartition(10).write().mode(SaveMode.Overwrite).parquet(tableF.getAbsolutePath());
>  
> > Stacktrace:
> Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most 
> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor 
> driver): java.lang.NoSuchMethodError: 
> sun.nio.ch.DirectBuffer.cleaner()Lsun/misc/Cleaner; +details
> Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most 
> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor 
> driver): java.lang.NoSuchMethodError: 
> sun.nio.ch.DirectBuffer.cleaner()Lsun/misc/Cleaner;
>  at 
> org.apache.spark.storage.StorageUtils$.cleanDirectBuffer(StorageUtils.scala:212)
>  at org.apache.spark.storage.StorageUtils$.dispose(StorageUtils.scala:207)
>  at org.apache.spark.storage.StorageUtils.dispose(StorageUtils.scala)
>  at 
> org.apache.spark.io.NioBufferedFileInputStream.close(NioBufferedFileInputStream.java:130)
>  at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
>  at 
> org.apache.spark.io.ReadAheadInputStream.close(ReadAheadInputStream.java:400)
>  at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.close(UnsafeSorterSpillReader.java:151)
>  at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:123)
>  at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$1.loadNext(UnsafeSorterSpillMerger.java:82)
>  at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:187)
>  at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:174)
>  at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
>  at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>  at org.apache.spark.scheduler.Task.run(Task.scala:121)
>  at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27593) CSV Parser returns 2 DataFrame - Valid and Malformed DFs

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27593.
--
Resolution: Won't Fix

Malformed column is just an informative field. I don't think we need a special 
API that returns two dataframes.

> CSV Parser returns 2 DataFrame - Valid and Malformed DFs
> 
>
> Key: SPARK-27593
> URL: https://issues.apache.org/jira/browse/SPARK-27593
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.2
>Reporter: Ladislav Jech
>Priority: Major
>
> When we process CSV in any kind of data warehouse, its common procedure to 
> report corrupted records for audit purposes and feedback back to vendor, so 
> they can enhance their procedure. CSV is no difference from XSD from 
> perspective that it define a schema although in very limited way (in some 
> cases only as number of columns without even headers, and we don't have 
> types), but when I check XML document against XSD file, I get exact report of 
> if the file is completely valid and if not I get exact report of what records 
> are not following schema. 
> Such feature will have big value in Spark for CSV, get malformed records into 
> some dataframe, with line count (pointer within the data object), so I can 
> log both pointer and real data (line/row) and trigger action on this 
> unfortunate event.
> load() method could return Array of DFs (Valid, Invalid)
> PERMISSIVE MODE isn't enough as soon as it fill missing fields with nulls, so 
> it is even harder to detect what is really wrong. Another approach at moment 
> is to read both permissive and dropmalformed modes into 2 dataframes and 
> compare those one against each other.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27595) Spark couldn't read partitioned(string type) Orc column correctly if the value contains Float/Double value

2019-04-30 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830460#comment-16830460
 ] 

Hyukjin Kwon commented on SPARK-27595:
--

Don't set Critical+ which is reserved for committers. Yes, it does look like 
{{sql.sources.partitionColumnTypeInference.enabled}} is the answer.

> Spark couldn't read partitioned(string type) Orc column correctly if the 
> value contains Float/Double value
> --
>
> Key: SPARK-27595
> URL: https://issues.apache.org/jira/browse/SPARK-27595
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Ameer Basha Pattan
>Priority: Major
>
> {code}
> create external table unique_keys (
>  key string
>  ,locator_id string
>  , create_date string
>  , sequence int
>  )
>  partitioned by (source string)
>  stored as orc location '/user/hive/warehouse/reservation.db/unique_keys';
> {code}
> {{/user/hive/warehouse/reservation.db/unique_keys}} contains data like below:
> {code}
> /user/hive/warehouse/reservation.db/unique_keys/source=6S
> /user/hive/warehouse/reservation.db/unique_keys/source=7F
> /user/hive/warehouse/reservation.db/unique_keys/source=7H
> /user/hive/warehouse/reservation.db/unique_keys/source=8D
> {code} 
> If I try to read orc files through Spark, 
> {code}
> val masterDF = 
> hiveContext.read.orc("/user/hive/warehouse/reservation.db/unique_keys")
> {code}
> source value getting changed to *7.0 and 8.0* for 7F and 8D respectively.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27594) spark.sql.orc.enableVectorizedReader causes milliseconds in Timestamp to be read incorrectly

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27594.
--
Resolution: Cannot Reproduce

> spark.sql.orc.enableVectorizedReader causes milliseconds in Timestamp to be 
> read incorrectly
> 
>
> Key: SPARK-27594
> URL: https://issues.apache.org/jira/browse/SPARK-27594
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Jan-Willem van der Sijp
>Priority: Major
>
> Using {{spark.sql.orc.impl=native}} and 
> {{spark.sql.orc.enableVectorizedReader=true}} causes reading of TIMESTAMP 
> columns in HIVE stored as ORC to be interpreted incorrectly. Specifically, 
> the milliseconds of time timestamp will be doubled.
> Input/output of a Zeppelin session to demonstrate:
> {code:python}
> %pyspark
> from pprint import pprint
> spark.conf.set("spark.sql.orc.impl", "native")
> spark.conf.set("spark.sql.orc.enableVectorizedReader", "true")
> pprint(spark.sparkContext.getConf().getAll())
> 
> [('sql.stacktrace', 'false'),
>  ('spark.eventLog.enabled', 'true'),
>  ('spark.app.id', 'application_1556200632329_0005'),
>  ('importImplicit', 'true'),
>  ('printREPLOutput', 'true'),
>  ('spark.history.ui.port', '18081'),
>  ('spark.driver.extraLibraryPath',
>   
> '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
>  ('spark.driver.extraJavaOptions',
>   ' -Dfile.encoding=UTF-8 '
>   
> '-Dlog4j.configuration=file:///usr/hdp/current/zeppelin-server/conf/log4j.properties
>  '
>   
> '-Dzeppelin.log.file=/var/log/zeppelin/zeppelin-interpreter-spark2-spark-zeppelin-sandbox-hdp.hortonworks.com.log'),
>  ('concurrentSQL', 'false'),
>  ('spark.driver.port', '40195'),
>  ('spark.executor.extraLibraryPath',
>   
> '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
>  ('useHiveContext', 'true'),
>  ('spark.jars',
>   
> 'file:/usr/hdp/current/zeppelin-server/interpreter/spark/zeppelin-spark_2.11-0.7.3.2.6.5.0-292.jar'),
>  ('spark.history.provider',
>   'org.apache.spark.deploy.history.FsHistoryProvider'),
>  ('spark.yarn.historyServer.address', 'sandbox-hdp.hortonworks.com:18081'),
>  ('spark.submit.deployMode', 'client'),
>  ('spark.ui.filters',
>   'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
>  
> ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
>   'sandbox-hdp.hortonworks.com'),
>  ('spark.eventLog.dir', 'hdfs:///spark2-history/'),
>  ('spark.repl.class.uri', 
> 'spark://sandbox-hdp.hortonworks.com:40195/classes'),
>  ('spark.driver.host', 'sandbox-hdp.hortonworks.com'),
>  ('master', 'yarn'),
>  ('spark.yarn.dist.archives',
>   '/usr/hdp/current/spark2-client/R/lib/sparkr.zip#sparkr'),
>  ('spark.scheduler.mode', 'FAIR'),
>  ('spark.yarn.queue', 'default'),
>  ('spark.history.kerberos.keytab',
>   '/etc/security/keytabs/spark.headless.keytab'),
>  ('spark.executor.id', 'driver'),
>  ('spark.history.fs.logDirectory', 'hdfs:///spark2-history/'),
>  ('spark.history.kerberos.enabled', 'false'),
>  ('spark.master', 'yarn'),
>  ('spark.sql.catalogImplementation', 'hive'),
>  ('spark.history.kerberos.principal', 'none'),
>  ('spark.driver.extraClassPath',
>   
> ':/usr/hdp/current/zeppelin-server/interpreter/spark/*:/usr/hdp/current/zeppelin-server/lib/interpreter/*::/usr/hdp/current/zeppelin-server/interpreter/spark/zeppelin-spark_2.11-0.7.3.2.6.5.0-292.jar'),
>  ('spark.driver.appUIAddress', 'http://sandbox-hdp.hortonworks.com:4040'),
>  ('spark.repl.class.outputDir',
>   '/tmp/spark-555b2143-0efa-45c1-aecc-53810f89aa5f'),
>  ('spark.yarn.isPython', 'true'),
>  ('spark.app.name', 'Zeppelin'),
>  
> ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
>   
> 'http://sandbox-hdp.hortonworks.com:8088/proxy/application_1556200632329_0005'),
>  ('maxResult', '1000'),
>  ('spark.executorEnv.PYTHONPATH',
>   
> '/usr/hdp/current/spark2-client//python/lib/py4j-0.10.6-src.zip:/usr/hdp/current/spark2-client//python/:/usr/hdp/current/spark2-client//python:/usr/hdp/current/spark2-client//python/lib/py4j-0.8.2.1-src.zip{{PWD}}/pyspark.zip{{PWD}}/py4j-0.10.6-src.zip'),
>  ('spark.ui.proxyBase', '/proxy/application_1556200632329_0005')]
> {code}
> {code:python}
> %pyspark
> spark.sql("""
> DROP TABLE IF EXISTS default.hivetest
> """)
> spark.sql("""
> CREATE TABLE default.hivetest (
> day DATE,
> time TIMESTAMP,
> timestring STRING
> )
> USING ORC
> """)
> {code}
> {code:python}
> %pyspark
> df1 = spark.createDataFrame(
> [
> ("2019-01-01", "2019-01-01 12:15:31.123", "2019-01-01 12:15:31.123")
> ],
> schema=("date", "timestamp", "string")
> )
> 

[jira] [Resolved] (SPARK-27595) Spark couldn't read partitioned(string type) Orc column correctly if the value contains Float/Double value

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27595.
--
Resolution: Not A Problem

> Spark couldn't read partitioned(string type) Orc column correctly if the 
> value contains Float/Double value
> --
>
> Key: SPARK-27595
> URL: https://issues.apache.org/jira/browse/SPARK-27595
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Ameer Basha Pattan
>Priority: Major
>
> {code}
> create external table unique_keys (
>  key string
>  ,locator_id string
>  , create_date string
>  , sequence int
>  )
>  partitioned by (source string)
>  stored as orc location '/user/hive/warehouse/reservation.db/unique_keys';
> {code}
> {{/user/hive/warehouse/reservation.db/unique_keys}} contains data like below:
> {code}
> /user/hive/warehouse/reservation.db/unique_keys/source=6S
> /user/hive/warehouse/reservation.db/unique_keys/source=7F
> /user/hive/warehouse/reservation.db/unique_keys/source=7H
> /user/hive/warehouse/reservation.db/unique_keys/source=8D
> {code} 
> If I try to read orc files through Spark, 
> {code}
> val masterDF = 
> hiveContext.read.orc("/user/hive/warehouse/reservation.db/unique_keys")
> {code}
> source value getting changed to *7.0 and 8.0* for 7F and 8D respectively.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27595) Spark couldn't read partitioned(string type) Orc column correctly if the value contains Float/Double value

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27595:
-
Description: 
{code}
create external table unique_keys (
 key string
 ,locator_id string
 , create_date string
 , sequence int
 )
 partitioned by (source string)
 stored as orc location '/user/hive/warehouse/reservation.db/unique_keys';
{code}

{{/user/hive/warehouse/reservation.db/unique_keys}} contains data like below:

{code}
/user/hive/warehouse/reservation.db/unique_keys/source=6S
/user/hive/warehouse/reservation.db/unique_keys/source=7F
/user/hive/warehouse/reservation.db/unique_keys/source=7H
/user/hive/warehouse/reservation.db/unique_keys/source=8D
{code} 

If I try to read orc files through Spark, 

{code}
val masterDF = 
hiveContext.read.orc("/user/hive/warehouse/reservation.db/unique_keys")
{code}

source value getting changed to *7.0 and 8.0* for 7F and 8D respectively.

 

  was:
create external table unique_keys (
key string
,locator_id string
, create_date string
, sequence int
)
partitioned by (source string)
stored as orc location '/user/hive/warehouse/reservation.db/unique_keys';

/user/hive/warehouse/reservation.db/unique_keys contains data like below:

/user/hive/warehouse/reservation.db/unique_keys/source=6S
/user/hive/warehouse/reservation.db/unique_keys/source=7F
/user/hive/warehouse/reservation.db/unique_keys/source=7H
/user/hive/warehouse/reservation.db/unique_keys/source=8D

 

If I try to read orc files through Spark, 

val masterDF = 
hiveContext.read.orc("/user/hive/warehouse/reservation.db/unique_keys")

source value getting changed to *7.0 and 8.0* for 7F and 8D respectively.

 


> Spark couldn't read partitioned(string type) Orc column correctly if the 
> value contains Float/Double value
> --
>
> Key: SPARK-27595
> URL: https://issues.apache.org/jira/browse/SPARK-27595
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Ameer Basha Pattan
>Priority: Critical
>
> {code}
> create external table unique_keys (
>  key string
>  ,locator_id string
>  , create_date string
>  , sequence int
>  )
>  partitioned by (source string)
>  stored as orc location '/user/hive/warehouse/reservation.db/unique_keys';
> {code}
> {{/user/hive/warehouse/reservation.db/unique_keys}} contains data like below:
> {code}
> /user/hive/warehouse/reservation.db/unique_keys/source=6S
> /user/hive/warehouse/reservation.db/unique_keys/source=7F
> /user/hive/warehouse/reservation.db/unique_keys/source=7H
> /user/hive/warehouse/reservation.db/unique_keys/source=8D
> {code} 
> If I try to read orc files through Spark, 
> {code}
> val masterDF = 
> hiveContext.read.orc("/user/hive/warehouse/reservation.db/unique_keys")
> {code}
> source value getting changed to *7.0 and 8.0* for 7F and 8D respectively.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27595) Spark couldn't read partitioned(string type) Orc column correctly if the value contains Float/Double value

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated SPARK-27595:
-
Priority: Major  (was: Critical)

> Spark couldn't read partitioned(string type) Orc column correctly if the 
> value contains Float/Double value
> --
>
> Key: SPARK-27595
> URL: https://issues.apache.org/jira/browse/SPARK-27595
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Ameer Basha Pattan
>Priority: Major
>
> {code}
> create external table unique_keys (
>  key string
>  ,locator_id string
>  , create_date string
>  , sequence int
>  )
>  partitioned by (source string)
>  stored as orc location '/user/hive/warehouse/reservation.db/unique_keys';
> {code}
> {{/user/hive/warehouse/reservation.db/unique_keys}} contains data like below:
> {code}
> /user/hive/warehouse/reservation.db/unique_keys/source=6S
> /user/hive/warehouse/reservation.db/unique_keys/source=7F
> /user/hive/warehouse/reservation.db/unique_keys/source=7H
> /user/hive/warehouse/reservation.db/unique_keys/source=8D
> {code} 
> If I try to read orc files through Spark, 
> {code}
> val masterDF = 
> hiveContext.read.orc("/user/hive/warehouse/reservation.db/unique_keys")
> {code}
> source value getting changed to *7.0 and 8.0* for 7F and 8D respectively.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27597) RuntimeConfig should be serializable

2019-04-30 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830457#comment-16830457
 ] 

Hyukjin Kwon commented on SPARK-27597:
--

Yes .. I don't think it's meant to be accessed in executors. If you want to 
access to SQL configuration, you could do `SQLConf.get` or get the serializable 
values in driver and pass.

> RuntimeConfig should be serializable
> 
>
> Key: SPARK-27597
> URL: https://issues.apache.org/jira/browse/SPARK-27597
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Major
>
> When implementing a UDF or similar, it's quite surprising to see that the 
> {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When 
> modeling UDFs in an object-oriented way, this leads to quite a surprise, an 
> ugly NPE from the {{call}} site.
> {noformat}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
> at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170)
> at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170)
> ...{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27597) RuntimeConfig should be serializable

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27597.
--
Resolution: Not A Problem

> RuntimeConfig should be serializable
> 
>
> Key: SPARK-27597
> URL: https://issues.apache.org/jira/browse/SPARK-27597
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Major
>
> When implementing a UDF or similar, it's quite surprising to see that the 
> {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When 
> modeling UDFs in an object-oriented way, this leads to quite a surprise, an 
> ugly NPE from the {{call}} site.
> {noformat}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
> at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170)
> at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170)
> ...{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-27600) Unable to start Spark Hive Thrift Server when multiple hive server server share the same metastore

2019-04-30 Thread Hyukjin Kwon (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-27600.
--
Resolution: Invalid

> Unable to start Spark Hive Thrift Server when multiple hive server server 
> share the same metastore
> --
>
> Key: SPARK-27600
> URL: https://issues.apache.org/jira/browse/SPARK-27600
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: pin_zhang
>Priority: Major
>
> When start ten or more spark hive thrift servers at the same time, more than 
> one version saved to table VERSION when meet exception WARN 
> [DataNucleus.Query] (main:) Query for candidates of 
> org.apache.hadoop.hive.metastore.model.MVersionTable and subclasses resulted 
> in no possible candidates
> Exception thrown obtaining schema column information from datastore
> org.datanucleus.exceptions.NucleusDataStoreException: Exception thrown 
> obtaining schema column information from datastore
> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 
> 'via_ms.deleteme1556239494724' doesn't exist
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>  at com.mysql.jdbc.Util.getInstance(Util.java:408)
>  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:944)
>  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3978)
>  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3914)
>  at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
>  at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
>  at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2491)
>  at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2449)
>  at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381)
>  at com.mysql.jdbc.DatabaseMetaData$2.forEach(DatabaseMetaData.java:2441)
>  at com.mysql.jdbc.DatabaseMetaData$2.forEach(DatabaseMetaData.java:2339)
>  at com.mysql.jdbc.IterateBlock.doForAll(IterateBlock.java:50)
>  at com.mysql.jdbc.DatabaseMetaData.getColumns(DatabaseMetaData.java:2337)
>  at 
> org.apache.commons.dbcp.DelegatingDatabaseMetaData.getColumns(DelegatingDatabaseMetaData.java:218)
>  at 
> org.datanucleus.store.rdbms.adapter.BaseDatastoreAdapter.getColumns(BaseDatastoreAdapter.java:1532)
>  at 
> org.datanucleus.store.rdbms.schema.RDBMSSchemaHandler.refreshTableData(RDBMSSchemaHandler.java:921)
> Then cannot start hive server any more because of 
> MetaException(message:Metastore contains multiple versions (2) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27600) Unable to start Spark Hive Thrift Server when multiple hive server server share the same metastore

2019-04-30 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830456#comment-16830456
 ] 

Hyukjin Kwon commented on SPARK-27600:
--

Doesn't look like Spark issue. looks no evidence that it's an issue in Spark.

> Unable to start Spark Hive Thrift Server when multiple hive server server 
> share the same metastore
> --
>
> Key: SPARK-27600
> URL: https://issues.apache.org/jira/browse/SPARK-27600
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.1
>Reporter: pin_zhang
>Priority: Major
>
> When start ten or more spark hive thrift servers at the same time, more than 
> one version saved to table VERSION when meet exception WARN 
> [DataNucleus.Query] (main:) Query for candidates of 
> org.apache.hadoop.hive.metastore.model.MVersionTable and subclasses resulted 
> in no possible candidates
> Exception thrown obtaining schema column information from datastore
> org.datanucleus.exceptions.NucleusDataStoreException: Exception thrown 
> obtaining schema column information from datastore
> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 
> 'via_ms.deleteme1556239494724' doesn't exist
>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>  at com.mysql.jdbc.Util.getInstance(Util.java:408)
>  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:944)
>  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3978)
>  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3914)
>  at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530)
>  at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)
>  at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2491)
>  at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2449)
>  at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381)
>  at com.mysql.jdbc.DatabaseMetaData$2.forEach(DatabaseMetaData.java:2441)
>  at com.mysql.jdbc.DatabaseMetaData$2.forEach(DatabaseMetaData.java:2339)
>  at com.mysql.jdbc.IterateBlock.doForAll(IterateBlock.java:50)
>  at com.mysql.jdbc.DatabaseMetaData.getColumns(DatabaseMetaData.java:2337)
>  at 
> org.apache.commons.dbcp.DelegatingDatabaseMetaData.getColumns(DelegatingDatabaseMetaData.java:218)
>  at 
> org.datanucleus.store.rdbms.adapter.BaseDatastoreAdapter.getColumns(BaseDatastoreAdapter.java:1532)
>  at 
> org.datanucleus.store.rdbms.schema.RDBMSSchemaHandler.refreshTableData(RDBMSSchemaHandler.java:921)
> Then cannot start hive server any more because of 
> MetaException(message:Metastore contains multiple versions (2) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27602) SparkSQL CBO can't get true size of partition table after partition pruning

2019-04-30 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830455#comment-16830455
 ] 

Hyukjin Kwon commented on SPARK-27602:
--

So, what's proposal to fix it?

> SparkSQL CBO can't get true size of partition table after partition pruning
> ---
>
> Key: SPARK-27602
> URL: https://issues.apache.org/jira/browse/SPARK-27602
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0, 2.3.0, 2.4.0
>Reporter: angerszhu
>Priority: Major
>
> When I want to do extract a cost of one sql for myself's cost framework,  I 
> found that CBO  can't get true size of partition table  since when partition 
> pruning is true. we just need corresponding partition's size. It just use the 
> tables's statistic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27606) Deprecate `extended` field in ExpressionDescription/ExpressionInfo

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27606:


Assignee: (was: Apache Spark)

> Deprecate `extended` field in ExpressionDescription/ExpressionInfo
> --
>
> Key: SPARK-27606
> URL: https://issues.apache.org/jira/browse/SPARK-27606
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> As of SPARK-21485 and SPARK-27328, we have nicer way to separately describe 
> extended usages.
> `extended` field and method at ExpressionDescription/ExpressionInfo is now 
> pretty useless.
> This Jira targets to deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27606) Deprecate `extended` field in ExpressionDescription/ExpressionInfo

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27606:


Assignee: Apache Spark

> Deprecate `extended` field in ExpressionDescription/ExpressionInfo
> --
>
> Key: SPARK-27606
> URL: https://issues.apache.org/jira/browse/SPARK-27606
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Hyukjin Kwon
>Assignee: Apache Spark
>Priority: Major
>
> As of SPARK-21485 and SPARK-27328, we have nicer way to separately describe 
> extended usages.
> `extended` field and method at ExpressionDescription/ExpressionInfo is now 
> pretty useless.
> This Jira targets to deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27606) Deprecate `extended` field in ExpressionDescription/ExpressionInfo

2019-04-30 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created SPARK-27606:


 Summary: Deprecate `extended` field in 
ExpressionDescription/ExpressionInfo
 Key: SPARK-27606
 URL: https://issues.apache.org/jira/browse/SPARK-27606
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Hyukjin Kwon


As of SPARK-21485 and SPARK-27328, we have nicer way to separately describe 
extended usages.

`extended` field and method at ExpressionDescription/ExpressionInfo is now 
pretty useless.

This Jira targets to deprecate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25888:


Assignee: Apache Spark

> Service requests for persist() blocks via external service after dynamic 
> deallocation
> -
>
> Key: SPARK-25888
> URL: https://issues.apache.org/jira/browse/SPARK-25888
> Project: Spark
>  Issue Type: New Feature
>  Components: Block Manager, Shuffle, YARN
>Affects Versions: 2.3.2
>Reporter: Adam Kennedy
>Assignee: Apache Spark
>Priority: Major
>
> Large and highly multi-tenant Spark on YARN clusters with diverse job 
> execution often display terrible utilization rates (we have observed as low 
> as 3-7% CPU at max container allocation, but 50% CPU utilization on even a 
> well policed cluster is not uncommon).
> As a sizing example, consider a scenario with 1,000 nodes, 50,000 cores, 250 
> users and 50,000 runs of 1,000 distinct applications per week, with 
> predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark 
> Notebook jobs (no streaming)
> Utilization problems appear to be due in large part to difficulties with 
> persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.
> In situations where an external shuffle service is present (which is typical 
> on clusters of this type) we already solve this for the shuffle block case by 
> offloading the IO handling of shuffle blocks to the external service, 
> allowing dynamic deallocation to proceed.
> Allowing Executors to transfer persist() blocks to some external "shuffle" 
> service in a similar manner would be an enormous win for Spark multi-tenancy 
> as it would limit deallocation blocking scenarios to only MEMORY-only cache() 
> scenarios.
> I'm not sure if I'm correct, but I seem to recall seeing in the original 
> external shuffle service commits that may have been considered at the time 
> but getting shuffle blocks moved to the external shuffle service was the 
> first priority.
> With support for external persist() DISK blocks in place, we could also then 
> handle deallocation of DISK+MEMORY, as the memory instance could first be 
> dropped, changing the block to DISK only, and then further transferred to the 
> shuffle service.
> We have tried to resolve the persist() issue via extensive user training, but 
> that has typically only allowed us to improve utilization of the worst 
> offenders (10% utilization) up to around 40-60% utilization, as the need for 
> persist() is often legitimate and occurs during the middle stages of a job.
> In a healthy multi-tenant scenario, a large job might spool up to say 10,000 
> cores, persist() data, release executors across a long tail down to 100 
> cores, and then spool back up to 10,000 cores for the following stage without 
> impact on the persist() data.
> In an ideal world, if an new executor started up on a node on which blocks 
> had been transferred to the shuffle service, the new executor might even be 
> able to "recapture" control of those blocks (if that would help with 
> performance in some way).
> And the behavior of gradually expanding up and down several times over the 
> course of a job would not just improve utilization, but would allow resources 
> to more easily be redistributed to other jobs which start on the cluster 
> during the long-tail periods, which would improve multi-tenancy and bring us 
> closer to optimal "envy free" YARN scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-25888) Service requests for persist() blocks via external service after dynamic deallocation

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-25888:


Assignee: (was: Apache Spark)

> Service requests for persist() blocks via external service after dynamic 
> deallocation
> -
>
> Key: SPARK-25888
> URL: https://issues.apache.org/jira/browse/SPARK-25888
> Project: Spark
>  Issue Type: New Feature
>  Components: Block Manager, Shuffle, YARN
>Affects Versions: 2.3.2
>Reporter: Adam Kennedy
>Priority: Major
>
> Large and highly multi-tenant Spark on YARN clusters with diverse job 
> execution often display terrible utilization rates (we have observed as low 
> as 3-7% CPU at max container allocation, but 50% CPU utilization on even a 
> well policed cluster is not uncommon).
> As a sizing example, consider a scenario with 1,000 nodes, 50,000 cores, 250 
> users and 50,000 runs of 1,000 distinct applications per week, with 
> predominantly Spark including a mixture of ETL, Ad Hoc tasks and PySpark 
> Notebook jobs (no streaming)
> Utilization problems appear to be due in large part to difficulties with 
> persist() blocks (DISK or DISK+MEMORY) preventing dynamic deallocation.
> In situations where an external shuffle service is present (which is typical 
> on clusters of this type) we already solve this for the shuffle block case by 
> offloading the IO handling of shuffle blocks to the external service, 
> allowing dynamic deallocation to proceed.
> Allowing Executors to transfer persist() blocks to some external "shuffle" 
> service in a similar manner would be an enormous win for Spark multi-tenancy 
> as it would limit deallocation blocking scenarios to only MEMORY-only cache() 
> scenarios.
> I'm not sure if I'm correct, but I seem to recall seeing in the original 
> external shuffle service commits that may have been considered at the time 
> but getting shuffle blocks moved to the external shuffle service was the 
> first priority.
> With support for external persist() DISK blocks in place, we could also then 
> handle deallocation of DISK+MEMORY, as the memory instance could first be 
> dropped, changing the block to DISK only, and then further transferred to the 
> shuffle service.
> We have tried to resolve the persist() issue via extensive user training, but 
> that has typically only allowed us to improve utilization of the worst 
> offenders (10% utilization) up to around 40-60% utilization, as the need for 
> persist() is often legitimate and occurs during the middle stages of a job.
> In a healthy multi-tenant scenario, a large job might spool up to say 10,000 
> cores, persist() data, release executors across a long tail down to 100 
> cores, and then spool back up to 10,000 cores for the following stage without 
> impact on the persist() data.
> In an ideal world, if an new executor started up on a node on which blocks 
> had been transferred to the shuffle service, the new executor might even be 
> able to "recapture" control of those blocks (if that would help with 
> performance in some way).
> And the behavior of gradually expanding up and down several times over the 
> course of a job would not just improve utilization, but would allow resources 
> to more easily be redistributed to other jobs which start on the cluster 
> during the long-tail periods, which would improve multi-tenancy and bring us 
> closer to optimal "envy free" YARN scheduling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27605) Add new column "Partition ID" to the tasks table in stages page

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27605:


Assignee: Apache Spark

> Add new column "Partition ID" to the tasks table in stages page
> ---
>
> Key: SPARK-27605
> URL: https://issues.apache.org/jira/browse/SPARK-27605
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core, Web UI
>Affects Versions: 3.0.0
>Reporter: Parth Gandhi
>Assignee: Apache Spark
>Priority: Minor
>
> If you have more than one stage attempt in a Spark job, the task index will 
> not equal partition id. Thus, adding a column for partition id would be a 
> useful feature to have.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27605) Add new column "Partition ID" to the tasks table in stages page

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27605:


Assignee: (was: Apache Spark)

> Add new column "Partition ID" to the tasks table in stages page
> ---
>
> Key: SPARK-27605
> URL: https://issues.apache.org/jira/browse/SPARK-27605
> Project: Spark
>  Issue Type: Story
>  Components: Spark Core, Web UI
>Affects Versions: 3.0.0
>Reporter: Parth Gandhi
>Priority: Minor
>
> If you have more than one stage attempt in a Spark job, the task index will 
> not equal partition id. Thus, adding a column for partition id would be a 
> useful feature to have.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-27605) Add new column "Partition ID" to the tasks table in stages page

2019-04-30 Thread Parth Gandhi (JIRA)
Parth Gandhi created SPARK-27605:


 Summary: Add new column "Partition ID" to the tasks table in 
stages page
 Key: SPARK-27605
 URL: https://issues.apache.org/jira/browse/SPARK-27605
 Project: Spark
  Issue Type: Story
  Components: Spark Core, Web UI
Affects Versions: 3.0.0
Reporter: Parth Gandhi


If you have more than one stage attempt in a Spark job, the task index will not 
equal partition id. Thus, adding a column for partition id would be a useful 
feature to have.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27591) A bug in UnivocityParser prevents using UDT

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27591:


Assignee: Apache Spark

> A bug in UnivocityParser prevents using UDT
> ---
>
> Key: SPARK-27591
> URL: https://issues.apache.org/jira/browse/SPARK-27591
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.2
>Reporter: Artem Kalchenko
>Assignee: Apache Spark
>Priority: Minor
>
> I am trying to define a UserDefinedType based on String but different from 
> StringType in Spark 2.4.1 but it looks like there is a bug in Spark or I am 
> doing smth incorrectly.
> I define my type as follows:
> {code:java}
> class MyType extends UserDefinedType[MyValue] {
>   override def sqlType: DataType = StringType
>   ...
> }
> @SQLUserDefinedType(udt = classOf[MyType])
> case class MyValue
> {code}
> I expect it to be read and stored as String with just a custom SQL type. In 
> fact Spark can't read the string at all:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$11
>  cannot be cast to org.apache.spark.unsafe.types.UTF8String
> at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
> at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
> at 
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
> {code}
> the problem is with UnivocityParser.makeConverter that doesn't return (String 
> => Any) function but (String => (String => Any)) in the case of UDT, see 
> UnivocityParser:184
> {code:java}
> case udt: UserDefinedType[_] => (datum: String) =>
>   makeConverter(name, udt.sqlType, nullable, options)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27591) A bug in UnivocityParser prevents using UDT

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27591:


Assignee: (was: Apache Spark)

> A bug in UnivocityParser prevents using UDT
> ---
>
> Key: SPARK-27591
> URL: https://issues.apache.org/jira/browse/SPARK-27591
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.2
>Reporter: Artem Kalchenko
>Priority: Minor
>
> I am trying to define a UserDefinedType based on String but different from 
> StringType in Spark 2.4.1 but it looks like there is a bug in Spark or I am 
> doing smth incorrectly.
> I define my type as follows:
> {code:java}
> class MyType extends UserDefinedType[MyValue] {
>   override def sqlType: DataType = StringType
>   ...
> }
> @SQLUserDefinedType(udt = classOf[MyType])
> case class MyValue
> {code}
> I expect it to be read and stored as String with just a custom SQL type. In 
> fact Spark can't read the string at all:
> {code:java}
> java.lang.ClassCastException: 
> org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$11
>  cannot be cast to org.apache.spark.unsafe.types.UTF8String
> at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
> at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
> at 
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getUTF8String(JoinedRow.scala:102)
> {code}
> the problem is with UnivocityParser.makeConverter that doesn't return (String 
> => Any) function but (String => (String => Any)) in the case of UDT, see 
> UnivocityParser:184
> {code:java}
> case udt: UserDefinedType[_] => (datum: String) =>
>   makeConverter(name, udt.sqlType, nullable, options)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27597) RuntimeConfig should be serializable

2019-04-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830179#comment-16830179
 ] 

Liang-Chi Hsieh commented on SPARK-27597:
-

Do you want to access {{SparkSession}} in UDF? {{SparkSession}} is 
{{Serializable}}, but I think it is not intended to be accessed in executors.

> RuntimeConfig should be serializable
> 
>
> Key: SPARK-27597
> URL: https://issues.apache.org/jira/browse/SPARK-27597
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.1
>Reporter: Nick Dimiduk
>Priority: Major
>
> When implementing a UDF or similar, it's quite surprising to see that the 
> {{SparkSession}} is {{Serializable}} but {{RuntimeConf}} is not. When 
> modeling UDFs in an object-oriented way, this leads to quite a surprise, an 
> ugly NPE from the {{call}} site.
> {noformat}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
> at org.apache.spark.sql.SparkSession.conf$lzycompute(SparkSession.scala:170)
> at org.apache.spark.sql.SparkSession.conf(SparkSession.scala:170)
> ...{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27604) Enhance constant and constraint propagation

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27604:


Assignee: Apache Spark

> Enhance constant and constraint propagation
> ---
>
> Key: SPARK-27604
> URL: https://issues.apache.org/jira/browse/SPARK-27604
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Peter Toth
>Assignee: Apache Spark
>Priority: Major
>
> There is some room for improvement in terms of how we can use constraints to 
> simplify expressions and maybe what new expressions we can infer.
> Here are some examples what the current implementation can do in terms of 
> expression simplification:
> {code:java}
> SELECT * FROM table WHERE i = 5 AND j = i + 3  =>  SELECT * FROM table 
> WHERE i = 5 AND j = 8
> {code}
> And here are some that it could with some improvement:
> {code:java}
> SELECT * FROM table WHERE i <= 5 AND i = 5 =>  SELECT * FROM table 
> WHERE i = 5
> SELECT * FROM table WHERE i < j AND ... AND i = j  =>  SELECT * FROM table 
> WHERE false{code}
> I also think that `ConstantPropagation` and `InferFiltersFromConstraints` 
> functionality somewhat overlaps and with some refactoring we could come up 
> with a better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-27604) Enhance constant and constraint propagation

2019-04-30 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-27604:


Assignee: (was: Apache Spark)

> Enhance constant and constraint propagation
> ---
>
> Key: SPARK-27604
> URL: https://issues.apache.org/jira/browse/SPARK-27604
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Peter Toth
>Priority: Major
>
> There is some room for improvement in terms of how we can use constraints to 
> simplify expressions and maybe what new expressions we can infer.
> Here are some examples what the current implementation can do in terms of 
> expression simplification:
> {code:java}
> SELECT * FROM table WHERE i = 5 AND j = i + 3  =>  SELECT * FROM table 
> WHERE i = 5 AND j = 8
> {code}
> And here are some that it could with some improvement:
> {code:java}
> SELECT * FROM table WHERE i <= 5 AND i = 5 =>  SELECT * FROM table 
> WHERE i = 5
> SELECT * FROM table WHERE i < j AND ... AND i = j  =>  SELECT * FROM table 
> WHERE false{code}
> I also think that `ConstantPropagation` and `InferFiltersFromConstraints` 
> functionality somewhat overlaps and with some refactoring we could come up 
> with a better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-27595) Spark couldn't read partitioned(string type) Orc column correctly if the value contains Float/Double value

2019-04-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-27595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16830165#comment-16830165
 ] 

Liang-Chi Hsieh commented on SPARK-27595:
-

Is turning off {{spark.sql.sources.partitionColumnTypeInference.enabled}} 
helpful?  

> Spark couldn't read partitioned(string type) Orc column correctly if the 
> value contains Float/Double value
> --
>
> Key: SPARK-27595
> URL: https://issues.apache.org/jira/browse/SPARK-27595
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
>Reporter: Ameer Basha Pattan
>Priority: Critical
>
> create external table unique_keys (
> key string
> ,locator_id string
> , create_date string
> , sequence int
> )
> partitioned by (source string)
> stored as orc location '/user/hive/warehouse/reservation.db/unique_keys';
> /user/hive/warehouse/reservation.db/unique_keys contains data like below:
> /user/hive/warehouse/reservation.db/unique_keys/source=6S
> /user/hive/warehouse/reservation.db/unique_keys/source=7F
> /user/hive/warehouse/reservation.db/unique_keys/source=7H
> /user/hive/warehouse/reservation.db/unique_keys/source=8D
>  
> If I try to read orc files through Spark, 
> val masterDF = 
> hiveContext.read.orc("/user/hive/warehouse/reservation.db/unique_keys")
> source value getting changed to *7.0 and 8.0* for 7F and 8D respectively.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   >