Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-28 Thread Tin Vu
Thanks for your response.  What do you mean when you said "immediately
return"?

On Wed, Mar 28, 2018, 10:33 PM Jörn Franke  wrote:

> I don’t think select * is a good benchmark. You should do a more complex
> operation, otherwise optimizes might see that you don’t do anything in the
> query and immediately return (similarly count might immediately return by
> using some statistics).
>
> On 29. Mar 2018, at 02:03, Tin Vu  wrote:
>
> Hi,
>
> I am executing a benchmark to compare performance of SparkSQL, Apache
> Drill and Presto. My experimental setup:
>
>- TPCDS dataset with scale factor 100 (size 100GB).
>- Spark, Drill, Presto have a same number of workers: 12.
>- Each worked has same allocated amount of memory: 4GB.
>- Data is stored by Hive with ORC format.
>
> I executed a very simple SQL query: "SELECT * from table_name"
> The issue is that for some small size tables (even table with few dozen of
> records), SparkSQL still required about 7-8 seconds to finish, while Drill
> and Presto only needed less than 1 second.
> For other large tables with billions records, SparkSQL performance was
> reasonable when it required 20-30 seconds to scan the whole table.
> Do you have any idea or reasonable explanation for this issue?
>
> Thanks,
>
>


Re: [SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-28 Thread Jörn Franke
I don’t think select * is a good benchmark. You should do a more complex 
operation, otherwise optimizes might see that you don’t do anything in the 
query and immediately return (similarly count might immediately return by using 
some statistics).

> On 29. Mar 2018, at 02:03, Tin Vu  wrote:
> 
> Hi,
> 
> I am executing a benchmark to compare performance of SparkSQL, Apache Drill 
> and Presto. My experimental setup:
> TPCDS dataset with scale factor 100 (size 100GB).
> Spark, Drill, Presto have a same number of workers: 12.
> Each worked has same allocated amount of memory: 4GB.
> Data is stored by Hive with ORC format.
> I executed a very simple SQL query: "SELECT * from table_name"
> The issue is that for some small size tables (even table with few dozen of 
> records), SparkSQL still required about 7-8 seconds to finish, while Drill 
> and Presto only needed less than 1 second.
> For other large tables with billions records, SparkSQL performance was 
> reasonable when it required 20-30 seconds to scan the whole table.
> Do you have any idea or reasonable explanation for this issue?
> Thanks,
> 


Unable to get results of intermediate dataset

2018-03-28 Thread Sunitha Chennareddy
 Hi Team,

I am new to Spark, my requirement is I have a huge list, which is converted
to spark dataset and I need to operate on this dataset and store computed
values in another object/dataset and store in memory for further processing.

Approach I tried is : list is retrieved from third party in a loop. I
converted this list to dataset and using function I am trying to iterate
and store results in another dataset.
Problem I am facing : I am not able to see any data in newly computed
dataset.

Kindly help me to sort out this issue, please let me know if any better
approach.

Sample Code:

Class Person implements Serializable{
private static final long serialVersionUID = 1L;

private String name;
Private PersonId id;
//getters and setters
}

Class personId
{
private int deptId;
//getters and setters
}

Class PersonDetails implements Serializable{

private static final long serialVersionUID = 1L;
private int deptId;
private BigDecimal sal;
private String name;

//getters and setters
}

In another Class - I have below template code

List personDtlsList = new ArrayList<>();
final Encoder encoder = Encoders.bean(Person.class);
final Encoder< PersonDetails > personDtlsEncoder = Encoders.bean(
PersonDetails .class);

// here I try to hit thrid party Interface and get person information in
list
List personList = getPersonInformation( passing few parameters);

Dataset personDS = sqlContext.createDataset(personList,encoder);
Dataset personDtlsDS = sqlContext.createDataset(
personDtlsList,personDtlsEncoder);


JavaRDD personDtlsRDD = personDS.toDF().toJavaRDD().map(new
Function() {
private static final long serialVersionUID = 2L;

@Override
  public PersonDetails call(Row row)  throws Exception{
PersonDetails personDetails = new PersonDetails();
//setter for personDetails - name, sal and others
personDetails.setName(row.getString(0));
personDetails.setSal(new BigDecimal(1));
 personDtlsDS.union(sqlContext.createDataset(new
ArrayList(){{add(personDetails);}}, personDtlsEncoder));
 return personDetails;
  }
});

personDtlsDS.count();


Regards,
Sunitha.


unsubscribe

2018-03-28 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark-sql importing schemas from catalogString or schema.toString()

2018-03-28 Thread Colin Williams
val test_schema = DataType.fromJson(schema).asInstanceOf[StructType]
val session = SparkHelper.getSparkSession
val df1: DataFrame = session.read
  .format("json")
  .schema(test_schema)
  .option("inferSchema","false")
  .option("mode","FAILFAST")
  .load("src/test/resources/*.gz")
df1.show(80)

On Wed, Mar 28, 2018 at 5:10 PM, Colin Williams
 wrote:
> I've had more success exporting the schema toJson and importing that.
> Something like:
>
>
> val df1: DataFrame = session.read
>   .format("json")
>   .schema(test_schema)
>   .option("inferSchema","false")
>   .option("mode","FAILFAST")
>   .load("src/test/resources/*.gz")
> df1.show(80)
>
>
>
> On Wed, Mar 28, 2018 at 3:25 PM, Colin Williams
>  wrote:
>> The to String representation look like where "someName" is unique:
>>
>>  StructType(StructField("someName",StringType,true),
>> StructField("someName",StructType(StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true)),true),
>>  StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>>  StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>>  StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>>  StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>>  StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>>  StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",
>> StructType(StructField("someName",StringType,true),
>>  StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",
>> StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,  true)),true),
>>  StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>>  StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",
>> StructType(StructField("someName",StringType,true),
>>  StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",
>> StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,true)),true),
>> StructField("someName",StructType(StructField("someName",StringType,true),
>> StructField("someName",StringType,  true)),true)),true),
>>  StructField("someName",BooleanType,true),
>> StructField("someName",LongType,true),
>> StructField("someName",StringType,true),
>> StructField("someName",StringType,true),
>> StructField("someName",StringType,true),
>> StructField("someName",StringType,true))
>>
>>
>> The catalogString looks something like where SOME_TABLE_NAME is unique:
>>
>> struct,
>> 
>> SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,
>> SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct>  
>> SOME_TABLE_NAME:string>,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:
>> struct,SOME_TABLE_NAME:struct>  
>> string>,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct> 

Re: spark-sql importing schemas from catalogString or schema.toString()

2018-03-28 Thread Colin Williams
I've had more success exporting the schema toJson and importing that.
Something like:


val df1: DataFrame = session.read
  .format("json")
  .schema(test_schema)
  .option("inferSchema","false")
  .option("mode","FAILFAST")
  .load("src/test/resources/*.gz")
df1.show(80)



On Wed, Mar 28, 2018 at 3:25 PM, Colin Williams
 wrote:
> The to String representation look like where "someName" is unique:
>
>  StructType(StructField("someName",StringType,true),
> StructField("someName",StructType(StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true)),true),
>  StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
>  StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
>  StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
>  StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
>  StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
>  StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",
> StructType(StructField("someName",StringType,true),
>  StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",
> StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,  true)),true),
>  StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
>  StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",
> StructType(StructField("someName",StringType,true),
>  StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",
> StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,true)),true),
> StructField("someName",StructType(StructField("someName",StringType,true),
> StructField("someName",StringType,  true)),true)),true),
>  StructField("someName",BooleanType,true),
> StructField("someName",LongType,true),
> StructField("someName",StringType,true),
> StructField("someName",StringType,true),
> StructField("someName",StringType,true),
> StructField("someName",StringType,true))
>
>
> The catalogString looks something like where SOME_TABLE_NAME is unique:
>
> struct,
> 
> SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,
> SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct  
> SOME_TABLE_NAME:string>,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:
> struct,SOME_TABLE_NAME:struct  
> string>,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct string,SOME_TABLE_NAME:string>,SOME_TABLE_NAME:struct,
>  
> SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct  
> SOME_TABLE_NAME:string>,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:
> struct,SOME_TABLE_NAME:struct  
> string>,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct 

[SparkSQL] SparkSQL performance on small TPCDS tables is very low when compared to Drill or Presto

2018-03-28 Thread Tin Vu
Hi,

I am executing a benchmark to compare performance of SparkSQL, Apache Drill
and Presto. My experimental setup:

   - TPCDS dataset with scale factor 100 (size 100GB).
   - Spark, Drill, Presto have a same number of workers: 12.
   - Each worked has same allocated amount of memory: 4GB.
   - Data is stored by Hive with ORC format.

I executed a very simple SQL query: "SELECT * from table_name"
The issue is that for some small size tables (even table with few dozen of
records), SparkSQL still required about 7-8 seconds to finish, while Drill
and Presto only needed less than 1 second.
For other large tables with billions records, SparkSQL performance was
reasonable when it required 20-30 seconds to scan the whole table.
Do you have any idea or reasonable explanation for this issue?

Thanks,


Unsubscribe

2018-03-28 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark-sql importing schemas from catalogString or schema.toString()

2018-03-28 Thread Colin Williams
The to String representation look like where "someName" is unique:

 StructType(StructField("someName",StringType,true),
StructField("someName",StructType(StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true)),true),
 StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
 StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
 StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
 StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
 StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
 StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",
StructType(StructField("someName",StringType,true),
 StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",
StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,  true)),true),
 StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
 StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",
StructType(StructField("someName",StringType,true),
 StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",
StructType(StructField("someName",StringType,true),
StructField("someName",StringType,true)),true),
StructField("someName",StructType(StructField("someName",StringType,true),
StructField("someName",StringType,  true)),true)),true),
 StructField("someName",BooleanType,true),
StructField("someName",LongType,true),
StructField("someName",StringType,true),
StructField("someName",StringType,true),
StructField("someName",StringType,true),
StructField("someName",StringType,true))


The catalogString looks something like where SOME_TABLE_NAME is unique:

struct,

SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,
SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:
struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,
 
SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:
struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,
 
SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:struct,SOME_TABLE_NAME:
struct>,SOME_TABLE_NAME:boolean,SOME_TABLE_NAME:bigint,

SOME_TABLE_NAME:string,SOME_TABLE_NAME:string,SOME_TABLE_NAME:string,SOME_TABLE_NAME:string>


On Wed, Mar 28, 2018 at 2:32 PM, Colin Williams
 

Apache Spark - Structured Streaming State Management With Watermark

2018-03-28 Thread M Singh
Hi:
I am using Apache Spark Structured Streaming (2.2.1) to implement custom 
sessionization for events.  The processing is in two steps:1. 
flatMapGroupsWithState (based on user id) - which stores the state of user and 
emits events every minute until a expire event is received 
2. The next step is a aggregation (group by count)

I am using outputMode - Update.

I have a few questions:
1. If I don't use watermark at all -      (a) is the state for 
flatMapGroupsWithState state stored forever ?      (b) is the state for groupBy 
count stored for ever ?2. Is watermark applicable for cleaning up groupBy 
aggregates only ?3. Can we use watermark to manage state in by 
flatMapGroupsWithState ? If so, how ?
4. Can watermark be used for other state clean up - are there any examples for 
those ?
Thanks


spark-sql importing schemas from catalogString or schema.toString()

2018-03-28 Thread Colin Williams
I've been learning spark-sql and have been trying to export and import
some of the generated schemas to edit them. I've been writing the
schemas to strings like df1.schema.toString() and
df.schema.catalogString

But I've been having trouble loading the schemas created. Does anyone
know if it's possible to work with the catalogString? I couldn't find
too many resources working with it. Is it possible to create a schema
from this string and load from it using the SparkSession?

Similarly I haven't yet sucessfully loaded the toString Schema, after
some small edits...


There's a little tidbit about some of this here:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-DataType.html

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrames :: Corrupted Data

2018-03-28 Thread Sergey Zhemzhitsky
I suppose that it's hardly possible that this issue is connected with
the string encoding, because

- "pr^?files.10056.10040" should be "profiles.10056.10040" and is
defined as constant in the source code
- 
"profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@"
should no occur in exception at all, because such a strings are not
created within the job
- the strings being corrupted are defined within the job and there are
no such input data
- when yarn restarts the job for the second time after the first
failure, the job completes successfully




On Wed, Mar 28, 2018 at 10:31 PM, Jörn Franke  wrote:
> Encoding issue of the data? Eg spark uses utf-8 , but source encoding is 
> different?
>
>> On 28. Mar 2018, at 20:25, Sergey Zhemzhitsky  wrote:
>>
>> Hello guys,
>>
>> I'm using Spark 2.2.0 and from time to time my job fails printing into
>> the log the following errors
>>
>> scala.MatchError:
>> profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>>
>> The job itself looks like the following and contains a few shuffles and UDAFs
>>
>> val df = spark.read.avro(...).as[...]
>>  .groupBy(...)
>>  .agg(collect_list(...).as(...))
>>  .select(explode(...).as(...))
>>  .groupBy(...)
>>  .agg(sum(...).as(...))
>>  .groupBy(...)
>>  .agg(collectMetrics(...).as(...))
>>
>> The errors occur in the collectMetrics UDAF in the following snippet
>>
>> key match {
>>  case "profiles.total" => updateMetrics(...)
>>  case "profiles.biz" => updateMetrics(...)
>>  case ProfileAttrsRegex(...) => updateMetrics(...)
>> }
>>
>> ... and I'm absolutely ok with scala.MatchError because there is no
>> "catch all" case in the pattern matching expression, but the strings
>> containing corrupted characters seem to be very strange.
>>
>> I've found the following jira issues, but it's hardly difficult to say
>> whether they are related to my case:
>> - https://issues.apache.org/jira/browse/SPARK-22092
>> - https://issues.apache.org/jira/browse/SPARK-23512
>>
>> So I'm wondering, has anybody ever seen such kind of behaviour and
>> what could be the problem?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrames :: Corrupted Data

2018-03-28 Thread Jörn Franke
Encoding issue of the data? Eg spark uses utf-8 , but source encoding is 
different?

> On 28. Mar 2018, at 20:25, Sergey Zhemzhitsky  wrote:
> 
> Hello guys,
> 
> I'm using Spark 2.2.0 and from time to time my job fails printing into
> the log the following errors
> 
> scala.MatchError:
> profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@
> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
> 
> The job itself looks like the following and contains a few shuffles and UDAFs
> 
> val df = spark.read.avro(...).as[...]
>  .groupBy(...)
>  .agg(collect_list(...).as(...))
>  .select(explode(...).as(...))
>  .groupBy(...)
>  .agg(sum(...).as(...))
>  .groupBy(...)
>  .agg(collectMetrics(...).as(...))
> 
> The errors occur in the collectMetrics UDAF in the following snippet
> 
> key match {
>  case "profiles.total" => updateMetrics(...)
>  case "profiles.biz" => updateMetrics(...)
>  case ProfileAttrsRegex(...) => updateMetrics(...)
> }
> 
> ... and I'm absolutely ok with scala.MatchError because there is no
> "catch all" case in the pattern matching expression, but the strings
> containing corrupted characters seem to be very strange.
> 
> I've found the following jira issues, but it's hardly difficult to say
> whether they are related to my case:
> - https://issues.apache.org/jira/browse/SPARK-22092
> - https://issues.apache.org/jira/browse/SPARK-23512
> 
> So I'm wondering, has anybody ever seen such kind of behaviour and
> what could be the problem?
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DataFrames :: Corrupted Data

2018-03-28 Thread Sergey Zhemzhitsky
Hello guys,

I'm using Spark 2.2.0 and from time to time my job fails printing into
the log the following errors

scala.MatchError:
profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)

The job itself looks like the following and contains a few shuffles and UDAFs

val df = spark.read.avro(...).as[...]
  .groupBy(...)
  .agg(collect_list(...).as(...))
  .select(explode(...).as(...))
  .groupBy(...)
  .agg(sum(...).as(...))
  .groupBy(...)
  .agg(collectMetrics(...).as(...))

The errors occur in the collectMetrics UDAF in the following snippet

key match {
  case "profiles.total" => updateMetrics(...)
  case "profiles.biz" => updateMetrics(...)
  case ProfileAttrsRegex(...) => updateMetrics(...)
}

... and I'm absolutely ok with scala.MatchError because there is no
"catch all" case in the pattern matching expression, but the strings
containing corrupted characters seem to be very strange.

I've found the following jira issues, but it's hardly difficult to say
whether they are related to my case:
- https://issues.apache.org/jira/browse/SPARK-22092
- https://issues.apache.org/jira/browse/SPARK-23512

So I'm wondering, has anybody ever seen such kind of behaviour and
what could be the problem?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Apache Spark - Structured Streaming StreamExecution Stats Description

2018-03-28 Thread M Singh
Hi:
I am using spark structured streaming 2.2.1 and am using flatMapGroupWithState 
and a groupBy count operators.
 In the StreamExecution logs I see two enteries for stateOperators
"stateOperators" : [ {
    "numRowsTotal" : 1617339,
    "numRowsUpdated" : 9647
  }, {
    "numRowsTotal" : 1326355,
    "numRowsUpdated" : 1398672
  } ],
My questions are:1. Is there way to figure out which stats is for 
flatMapGroupWithState and which one for groupBy count ?  In my case, I can 
guess based on my data but want to be definitive about it.2. For the second 
stats - how can the numRowsTotal (1326355) be less than numRowsUpdated 
(1398672) ?
If there in documentation I can use to understand the debug output, please let 
me know.

Thanks


Re: SparkStraming job break with shuffle file not found

2018-03-28 Thread Lucas Kacher
I have been running into this as well, but I am using S3 for checkpointing
so I chalked it up to network partitioning with s3-isnt-hdfs as my storage
location. But it seems that you are indeed using hdfs, so I wonder if there
is another underlying issue.

On Wed, Mar 28, 2018 at 8:21 AM, Jone Zhang  wrote:

> The spark streaming job running for a few days,then fail as below
> What is the possible reason?
>
> *18/03/25 07:58:37 ERROR yarn.ApplicationMaster: User class threw
> exception: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 16 in stage 80018.0 failed 4 times, most recent failure: Lost
> task 16.3 in stage 80018.0 (TID 7318859, 10.196.155.153):
> java.io.FileNotFoundException:
> /data/hadoop_tmp/nm-local-dir/usercache/mqq/appcache/application_1521712903594_6152/blockmgr-7aa2fb13-25d8-4145-a704-7861adfae4ec/22/shuffle_40009_16_0.data.574b45e8-bafd-437d-8fbf-deb6e3a1d001
> (No such file or directory)*
>
> Thanks!
>
>


-- 

*Lucas Kacher*Senior Engineer
-
vsco.co 
New York, NY
818.512.5239


SparkStraming job break with shuffle file not found

2018-03-28 Thread Jone Zhang
The spark streaming job running for a few days,then fail as below
What is the possible reason?

*18/03/25 07:58:37 ERROR yarn.ApplicationMaster: User class threw
exception: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 16 in stage 80018.0 failed 4 times, most recent failure: Lost
task 16.3 in stage 80018.0 (TID 7318859, 10.196.155.153):
java.io.FileNotFoundException:
/data/hadoop_tmp/nm-local-dir/usercache/mqq/appcache/application_1521712903594_6152/blockmgr-7aa2fb13-25d8-4145-a704-7861adfae4ec/22/shuffle_40009_16_0.data.574b45e8-bafd-437d-8fbf-deb6e3a1d001
(No such file or directory)*

Thanks!


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-28 Thread Jiří Syrový
Quick comment:

Excel CSV (very special case though) supports arrays in CSV using "\n"
inside quotes, but you have to use as EOL for the row "\r\n" (Windows EOL).

Cheers,
Jiri

2018-03-28 14:14 GMT+02:00 Yong Zhang :

> Your dataframe has array data type, which is NOT supported by CSV. How csv
> file can include array or other nest structure?
>
>
> If you want your data to be human readable text, write out as json in your
> case then.
>
>
> Yong
>
>
> --
> *From:* Mina Aslani 
> *Sent:* Wednesday, March 28, 2018 12:22 AM
> *To:* naresh Goud
> *Cc:* user @spark
> *Subject:* Re: java.lang.UnsupportedOperationException: CSV data source
> does not support struct/ERROR RetryingBlockFetcher
>
> Hi Naresh,
>
> Thank you for the quick response, appreciate it.
> Removing the option("header","true") and trying
>
> df = spark.read.parquet("test.parquet"), now can read the parquet works.
> However, I would like to find a way to have the data in csv/readable.
> still I cannot save df as csv as it throws.
> ava.lang.UnsupportedOperationException: CSV data source does not support
> struct
> data type.
>
> Any idea?
>
>
> Best regards,
>
> Mina
>
>
> On Tue, Mar 27, 2018 at 10:51 PM, naresh Goud 
> wrote:
>
> In case of storing as parquet file I don’t think it requires header.
> option("header","true")
>
> Give a try by removing header option and then try to read it.  I haven’t
> tried. Just a thought.
>
> Thank you,
> Naresh
>
>
> On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani  wrote:
>
> Hi,
>
>
> I am using pyspark. To transform my sample data and create model, I use
> stringIndexer and OneHotEncoder.
>
>
> However, when I try to write data as csv using below command
>
> df.coalesce(1).write.option("header","true").mode("overwrite
> ").csv("output.csv")
>
>
> I get UnsupportedOperationException
>
> java.lang.UnsupportedOperationException: CSV data source does not support
> struct
> data type.
>
> Therefore, to save data and avoid getting the error I use
>
>
> df.coalesce(1).write.option("header","true").mode("overwrite
> ").save("output")
>
>
> The above command saves data but it's in parquet format.
> How can I read parquet file and convert to csv to observe the data?
>
> When I use
>
> df = spark.read.parquet("1.parquet"), it throws:
>
> ERROR RetryingBlockFetcher: Exception while beginning fetch of 1
> outstanding blocks
>
> Your input is appreciated.
>
>
> Best regards,
>
> Mina
>
>
>
> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
>


Re: java.lang.UnsupportedOperationException: CSV data source does not support struct/ERROR RetryingBlockFetcher

2018-03-28 Thread Yong Zhang
Your dataframe has array data type, which is NOT supported by CSV. How csv file 
can include array or other nest structure?


If you want your data to be human readable text, write out as json in your case 
then.


Yong



From: Mina Aslani 
Sent: Wednesday, March 28, 2018 12:22 AM
To: naresh Goud
Cc: user @spark
Subject: Re: java.lang.UnsupportedOperationException: CSV data source does not 
support struct/ERROR RetryingBlockFetcher

Hi Naresh,

Thank you for the quick response, appreciate it.
Removing the option("header","true") and trying

df = spark.read.parquet("test.parquet"), now can read the parquet works. 
However, I would like to find a way to have the data in csv/readable.
still I cannot save df as csv as it throws.
ava.lang.UnsupportedOperationException: CSV data source does not support 
struct data type.

Any idea?

Best regards,

Mina


On Tue, Mar 27, 2018 at 10:51 PM, naresh Goud 
> wrote:
In case of storing as parquet file I don’t think it requires header.
option("header","true")

Give a try by removing header option and then try to read it.  I haven’t tried. 
Just a thought.

Thank you,
Naresh


On Tue, Mar 27, 2018 at 9:47 PM Mina Aslani 
> wrote:

Hi,


I am using pyspark. To transform my sample data and create model, I use 
stringIndexer and OneHotEncoder.


However, when I try to write data as csv using below command

df.coalesce(1).write.option("header","true").mode("overwrite").csv("output.csv")


I get UnsupportedOperationException

java.lang.UnsupportedOperationException: CSV data source does not support 
struct data type.

Therefore, to save data and avoid getting the error I use


df.coalesce(1).write.option("header","true").mode("overwrite").save("output")


The above command saves data but it's in parquet format.
How can I read parquet file and convert to csv to observe the data?

When I use

df = spark.read.parquet("1.parquet"), it throws:

ERROR RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding 
blocks

Your input is appreciated.


Best regards,

Mina



--
Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/




Testing with spark-base-test

2018-03-28 Thread Guillermo Ortiz
I'm using spark-unit-test and I don't get to compile the code.

  test("Testging") {
val inputInsert = A("data2")
val inputDelete = A("data1")
val outputInsert = B(1)
val outputDelete = C(1)

val input = List(List(inputInsert), List(inputDelete))
val output = (List(List(outputInsert)), List(List(outputDelete)))

//Why doesn't it compile?? I have tried many things here.
testOperation[A,(B,C)](input, service.processing _, output)
  }

My method is:

def processing(avroDstream: DStream[A]) : (DStream[B],DStream[C]) ={...}

What does the "_" means in this case?


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-28 Thread Gourav Sengupta
Hi Michael,

I think that is what I am trying to show here as the documentation mentions
"NOTE: In Spark 1.0 and later this will be overridden by SPARK_LOCAL_DIRS
(Standalone, Mesos) or LOCAL_DIRS (YARN) environment variables set by the
cluster manager."

So, in a way I am supporting your statement :)

Regards,
Gourav

On Wed, Mar 28, 2018 at 10:00 AM, Michael Shtelma 
wrote:

> Hi,
>
> this property will be used in YARN mode only by the driver.
> Executors will use the properties coming from YARN for storing temporary
> files.
>
>
> Best,
> Michael
>
> On Wed, Mar 28, 2018 at 7:37 AM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>>
>> As per documentation in: https://spark.apache.org/d
>> ocs/latest/configuration.html
>>
>>
>> spark.local.dir /tmp Directory to use for "scratch" space in Spark,
>> including map output files and RDDs that get stored on disk. This should be
>> on a fast, local disk in your system. It can also be a comma-separated list
>> of multiple directories on different disks. NOTE: In Spark 1.0 and later
>> this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
>> LOCAL_DIRS (YARN) environment variables set by the cluster manager.
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>>
>>
>> On Mon, Mar 26, 2018 at 8:28 PM, Michael Shtelma 
>> wrote:
>>
>>> Hi Keith,
>>>
>>> Thanks  for the suggestion!
>>> I have solved this already.
>>> The problem was, that the yarn process was not responding to
>>> start/stop commands and has not applied my configuration changes.
>>> I have killed it and restarted my cluster, and after that yarn has
>>> started using yarn.nodemanager.local-dirs parameter defined in
>>> yarn-site.xml.
>>> After this change, -Djava.io.tmpdir for the spark executor was set
>>> correctly,  according to yarn.nodemanager.local-dirs parameter.
>>>
>>> Best,
>>> Michael
>>>
>>>
>>> On Mon, Mar 26, 2018 at 9:15 PM, Keith Chapman 
>>> wrote:
>>> > Hi Michael,
>>> >
>>> > sorry for the late reply. I guess you may have to set it through the
>>> hdfs
>>> > core-site.xml file. The property you need to set is "hadoop.tmp.dir"
>>> which
>>> > defaults to "/tmp/hadoop-${user.name}"
>>> >
>>> > Regards,
>>> > Keith.
>>> >
>>> > http://keith-chapman.com
>>> >
>>> > On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma 
>>> wrote:
>>> >>
>>> >> Hi Keith,
>>> >>
>>> >> Thank you for the idea!
>>> >> I have tried it, so now the executor command is looking in the
>>> following
>>> >> way :
>>> >>
>>> >> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
>>> >> '-Djava.io.tmpdir=my_prefered_path'
>>> >>
>>> >> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/msh/
>>> appcache/application_1521110306769_0041/container_1521110306
>>> 769_0041_01_04/tmp
>>> >>
>>> >> JVM is using the second Djava.io.tmpdir parameter and writing
>>> >> everything to the same directory as before.
>>> >>
>>> >> Best,
>>> >> Michael
>>> >> Sincerely,
>>> >> Michael Shtelma
>>> >>
>>> >>
>>> >> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman <
>>> keithgchap...@gmail.com>
>>> >> wrote:
>>> >> > Can you try setting spark.executor.extraJavaOptions to have
>>> >> > -Djava.io.tmpdir=someValue
>>> >> >
>>> >> > Regards,
>>> >> > Keith.
>>> >> >
>>> >> > http://keith-chapman.com
>>> >> >
>>> >> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <
>>> mshte...@gmail.com>
>>> >> > wrote:
>>> >> >>
>>> >> >> Hi Keith,
>>> >> >>
>>> >> >> Thank you for your answer!
>>> >> >> I have done this, and it is working for spark driver.
>>> >> >> I would like to make something like this for the executors as
>>> well, so
>>> >> >> that the setting will be used on all the nodes, where I have
>>> executors
>>> >> >> running.
>>> >> >>
>>> >> >> Best,
>>> >> >> Michael
>>> >> >>
>>> >> >>
>>> >> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman
>>> >> >> 
>>> >> >> wrote:
>>> >> >> > Hi Michael,
>>> >> >> >
>>> >> >> > You could either set spark.local.dir through spark conf or
>>> >> >> > java.io.tmpdir
>>> >> >> > system property.
>>> >> >> >
>>> >> >> > Regards,
>>> >> >> > Keith.
>>> >> >> >
>>> >> >> > http://keith-chapman.com
>>> >> >> >
>>> >> >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <
>>> mshte...@gmail.com>
>>> >> >> > wrote:
>>> >> >> >>
>>> >> >> >> Hi everybody,
>>> >> >> >>
>>> >> >> >> I am running spark job on yarn, and my problem is that the
>>> >> >> >> blockmgr-*
>>> >> >> >> folders are being created under
>>> >> >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/applicat
>>> ion_id/*
>>> >> >> >> The size of this folder can grow to a significant size and does
>>> not
>>> >> >> >> really fit into /tmp file system for one job, which makes a real
>>> >> >> >> problem for my installation.
>>> >> >> >> I have redefined hadoop.tmp.dir in core-site.xml and
>>> >> >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
>>> >> >> >> location and 

Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-28 Thread Michael Shtelma
Hi,

this property will be used in YARN mode only by the driver.
Executors will use the properties coming from YARN for storing temporary
files.


Best,
Michael

On Wed, Mar 28, 2018 at 7:37 AM, Gourav Sengupta 
wrote:

> Hi,
>
>
> As per documentation in: https://spark.apache.org/
> docs/latest/configuration.html
>
>
> spark.local.dir /tmp Directory to use for "scratch" space in Spark,
> including map output files and RDDs that get stored on disk. This should be
> on a fast, local disk in your system. It can also be a comma-separated list
> of multiple directories on different disks. NOTE: In Spark 1.0 and later
> this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
> LOCAL_DIRS (YARN) environment variables set by the cluster manager.
>
> Regards,
> Gourav Sengupta
>
>
>
>
>
> On Mon, Mar 26, 2018 at 8:28 PM, Michael Shtelma 
> wrote:
>
>> Hi Keith,
>>
>> Thanks  for the suggestion!
>> I have solved this already.
>> The problem was, that the yarn process was not responding to
>> start/stop commands and has not applied my configuration changes.
>> I have killed it and restarted my cluster, and after that yarn has
>> started using yarn.nodemanager.local-dirs parameter defined in
>> yarn-site.xml.
>> After this change, -Djava.io.tmpdir for the spark executor was set
>> correctly,  according to yarn.nodemanager.local-dirs parameter.
>>
>> Best,
>> Michael
>>
>>
>> On Mon, Mar 26, 2018 at 9:15 PM, Keith Chapman 
>> wrote:
>> > Hi Michael,
>> >
>> > sorry for the late reply. I guess you may have to set it through the
>> hdfs
>> > core-site.xml file. The property you need to set is "hadoop.tmp.dir"
>> which
>> > defaults to "/tmp/hadoop-${user.name}"
>> >
>> > Regards,
>> > Keith.
>> >
>> > http://keith-chapman.com
>> >
>> > On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma 
>> wrote:
>> >>
>> >> Hi Keith,
>> >>
>> >> Thank you for the idea!
>> >> I have tried it, so now the executor command is looking in the
>> following
>> >> way :
>> >>
>> >> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
>> >> '-Djava.io.tmpdir=my_prefered_path'
>> >>
>> >> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/msh/
>> appcache/application_1521110306769_0041/container_1521110306
>> 769_0041_01_04/tmp
>> >>
>> >> JVM is using the second Djava.io.tmpdir parameter and writing
>> >> everything to the same directory as before.
>> >>
>> >> Best,
>> >> Michael
>> >> Sincerely,
>> >> Michael Shtelma
>> >>
>> >>
>> >> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman <
>> keithgchap...@gmail.com>
>> >> wrote:
>> >> > Can you try setting spark.executor.extraJavaOptions to have
>> >> > -Djava.io.tmpdir=someValue
>> >> >
>> >> > Regards,
>> >> > Keith.
>> >> >
>> >> > http://keith-chapman.com
>> >> >
>> >> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <
>> mshte...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> Hi Keith,
>> >> >>
>> >> >> Thank you for your answer!
>> >> >> I have done this, and it is working for spark driver.
>> >> >> I would like to make something like this for the executors as well,
>> so
>> >> >> that the setting will be used on all the nodes, where I have
>> executors
>> >> >> running.
>> >> >>
>> >> >> Best,
>> >> >> Michael
>> >> >>
>> >> >>
>> >> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman
>> >> >> 
>> >> >> wrote:
>> >> >> > Hi Michael,
>> >> >> >
>> >> >> > You could either set spark.local.dir through spark conf or
>> >> >> > java.io.tmpdir
>> >> >> > system property.
>> >> >> >
>> >> >> > Regards,
>> >> >> > Keith.
>> >> >> >
>> >> >> > http://keith-chapman.com
>> >> >> >
>> >> >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <
>> mshte...@gmail.com>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Hi everybody,
>> >> >> >>
>> >> >> >> I am running spark job on yarn, and my problem is that the
>> >> >> >> blockmgr-*
>> >> >> >> folders are being created under
>> >> >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/applicat
>> ion_id/*
>> >> >> >> The size of this folder can grow to a significant size and does
>> not
>> >> >> >> really fit into /tmp file system for one job, which makes a real
>> >> >> >> problem for my installation.
>> >> >> >> I have redefined hadoop.tmp.dir in core-site.xml and
>> >> >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
>> >> >> >> location and expected that the block manager will create the
>> files
>> >> >> >> there and not under /tmp, but this is not the case. The files are
>> >> >> >> created under /tmp.
>> >> >> >>
>> >> >> >> I am wondering if there is a way to make spark not use /tmp at
>> all
>> >> >> >> and
>> >> >> >> configure it to create all the files somewhere else ?
>> >> >> >>
>> >> >> >> Any assistance would be greatly appreciated!
>> >> >> >>
>> >> >> >> Best,
>> >> >> >> Michael
>> >> >> >>
>> >> >> >>
>> >> >> >> 
>> 

Re: [Spark Java] Add new column in DataSet based on existed column

2018-03-28 Thread Divya Gehlot
Hi ,

Here is example snippet in scala

// Convert to a Date typeval timestamp2datetype: (Column) => Column =
(x) => { to_date(x) }df = df.withColumn("date",
timestamp2datetype(col("end_date")))

Hope this helps !

Thanks,

Divya



On 28 March 2018 at 15:16, Junfeng Chen  wrote:

> I am working on adding a date transformed field on existed dataset.
>
> The current dataset contains a column named timestamp in ISO format. I
> want to parse this field to joda time type, and then extract the year,
> month, day, hour info as new column attaching to original dataset.
> I have tried df.withColumn function, but it seems only support simple
> expression rather than customized function like MapFunction.
> How to solve it?
>
> Thanks!
>
>
>
> Regard,
> Junfeng Chen
>


[Spark Java] Add new column in DataSet based on existed column

2018-03-28 Thread Junfeng Chen
I am working on adding a date transformed field on existed dataset.

The current dataset contains a column named timestamp in ISO format. I want
to parse this field to joda time type, and then extract the year, month,
day, hour info as new column attaching to original dataset.
I have tried df.withColumn function, but it seems only support simple
expression rather than customized function like MapFunction.
How to solve it?

Thanks!



Regard,
Junfeng Chen