[ANNOUNCE] .NET for Apache Spark™ 2.1 released

2022-02-02 Thread Terry Kim
Hi,

We are happy to announce that .NET for Apache Spark™ v2.1 has been released
<https://github.com/dotnet/spark/releases/tag/v2.1.0>! The release note
<https://github.com/dotnet/spark/blob/main/docs/release-notes/2.1.0/release-2.1.0.md>
includes
the full list of features/improvements of this release.

Here are the some of the highlights:

   - Support for Apache Spark 3.2
   - Exposing new SQL function APIs introduced in Spark 3.2

We would like to thank the community for the great feedback and all those
who contributed to this release.

Thanks,
Terry Kim on behalf of the .NET for Apache Spark™ team


Announcing Hyperspace v0.4.0 - an indexing subsystem for Apache Spark™

2021-02-08 Thread Terry Kim
Hi,

We are happy to announce that Hyperspace v0.4.0 - an indexing subsystem for
Apache Spark™ - has been released
<https://github.com/microsoft/hyperspace/releases/tag/v0.4.0>!

Here are the some of the highlights:

   - Delta Lake support: Hyperspace v0.4.0 supports creating indexes on
   Delta Lake tables. Please refer to the user guide
   
<https://microsoft.github.io/hyperspace/docs/ug-supported-data-formats/#delta-lake>
for
   more info.
   - Support for Databricks: A known issue when Hyperspace was run on
   Databricks has been addressed. Hyperspace v0.4.0 can now run on Databricks
   Runtime 5.5 LTS & 6.4!
   - Globbing patterns for indexes: Globbing patterns can be used to
   specify a subset of source data to create/maintain index on. Please refer
   to the user guide
   
<https://microsoft.github.io/hyperspace/docs/ug-quick-start-guide/#supporting-globbing-patterns-on-hyperspace-since-040>
on
   the usage.
   - Hybrid Scan improvements: Hyperspace 0.4.0 brings in several
   improvements on Hybrid Scan such as a better mechanism
   
<https://microsoft.github.io/hyperspace/docs/ug-mutable-dataset/#how-to-enable>
to
   enable/disable the feature, rank algorithm improvements, quick index
   refresh, etc.
   - Pluggable source provider: This release introduces a (evolving)
   pluggable source provider API set so that different source formats can be
   plugged in. This enabled Delta Lake source to be plugged in, and there is
   on-going PR to support Iceberg tables.

We would like to thank the community for the great feedback and all those
who contributed to this release.

Thanks,
Terry Kim on behalf of the Hyperspace team


Re: [Spark SQL]HiveQL and Spark SQL producing different results

2021-01-12 Thread Terry Kim
Ying,
Can you share a query that produces different results?

Thanks,
Terry

On Sun, Jan 10, 2021 at 1:48 PM Ying Zhou  wrote:

> Hi,
>
> I run some SQL using both Hive and Spark. Usually we get the same results.
> However when a window function is in the script Hive and Spark can produce
> different results. Is this intended behavior or either Hive or Spark has a
> bug?
>
> Thanks,
> Ying
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Announcing Hyperspace v0.3.0 - an indexing subsystem for Apache Spark™

2020-11-17 Thread Terry Kim
Hi,

We are happy to announce that Hyperspace v0.3.0 - an indexing subsystem for
Apache Spark™ - has been just released
<https://github.com/microsoft/hyperspace/releases/tag/v0.3.0>!

Here are the some of the highlights:

   - Mutable dataset support: Hyperspace v0.3.0 supports mutable dataset
   where users can append or delete the source data.
  - Hybrid scan: Prior to v0.3.0, any changes in the original dataset
  content required a full refresh to make the index usable again,
which could
  be a costly operation. With the Hybrid scan, the existing index can be
  utilized along with newly appended and/or deleted source files, without
  explicit refresh operation. Please check out the Hybrid Scan doc
  
<https://microsoft.github.io/hyperspace/docs/ug-mutable-dataset/#hybrid-scan>
for
  more detail.
  - Incremental refresh: v0.3.0 introduces a "incremental" mode to
  refresh indexes. In this mode, index files are created only for the newly
  appended source files; deleted source files are also handled by removing
  them from the existing index files. Please check out the Incremental
  Refresh doc
  
<https://microsoft.github.io/hyperspace/docs/ug-mutable-dataset/#refresh-index---incremental-mode>
for
  more detail.
   - Optimize index: The number of files for indexes can increase due to
   the incremental refreshes, possibly degrading the performance. The new
   "optimizeIndex" API optimizes the existing indexes by merging index files
   to create an optimal number of files. Please check out the Optimize
   Index doc
   <https://microsoft.github.io/hyperspace/docs/ug-optimize-index/> for
   more detail.

We would like to thank the community for the great feedback and all those
who contributed to this release.

Thanks,
Terry Kim on behalf of the Hyperspace team


Announcing .NET for Apache Spark™ 1.0

2020-11-06 Thread Terry Kim
Hi,

We are happy to announce that .NET for Apache Spark™ v1.0 has been released
<https://github.com/dotnet/spark/releases/tag/v1.0.0>! Please check
out the official
blog
<https://cloudblogs.microsoft.com/opensource/2020/10/30/announcing-net-apache-spark-1/>.
The release note
<https://github.com/dotnet/spark/blob/master/docs/release-notes/1.0.0/release-1.0.0.md>
includes
the full list of features/improvements of this release.

Here are the some of the highlights:

   - Support for Apache Spark 3.0
   - Exposing new DataFrame / SQL function APIs introduced in Spark 3.0
   - Support for all the complex types in Spark SQL
   - Support for Delta Lake <https://github.com/delta-io/delta> v0.7 and
   Hyperspace <https://github.com/microsoft/hyperspace> v0.2

We would like to thank the community for the great feedback and all those
who contributed to this release.

Thanks,
Terry Kim on behalf of the .NET for Apache Spark™ team


Re: Renaming a DataFrame column makes Spark lose partitioning information

2020-08-04 Thread Terry Kim
This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:

scala> :paste
// Entering paste mode (ctrl-D to finish)

Seq((1, 2))
  .toDF("a", "b")
  .repartition($"b")
  .withColumnRenamed("b", "c")
  .repartition($"c")
  .explain()

// Exiting paste mode, now interpreting.

== Physical Plan ==
*(1) Project [a#7, b#8 AS c#11]
+- Exchange hashpartitioning(b#8, 200), false, [id=#12]
   +- LocalTableScan [a#7, b#8]

Thanks,
Terry

On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger 
wrote:

> Hi,
>
> When renaming a DataFrame column, it looks like Spark is forgetting the
> partition information:
>
> Seq((1, 2))
>   .toDF("a", "b")
>   .repartition($"b")
>   .withColumnRenamed("b", "c")
>   .repartition($"c")
>   .explain()
>
> Gives the following plan:
>
> == Physical Plan ==
> Exchange hashpartitioning(c#40, 10)
> +- *(1) Project [a#36, b#37 AS c#40]
>+- Exchange hashpartitioning(b#37, 10)
>   +- LocalTableScan [a#36, b#37]
>
> As you can see, two shuffles are done, but the second one is unnecessary.
> Is there a reason I don't know for this behavior ? Is there a way to work
> around it (other than not renaming my columns) ?
>
> I'm using Spark 2.4.3.
>
>
> Thanks for your help,
>
> Antoine
>


Re: Future timeout

2020-07-20 Thread Terry Kim
"spark.sql.broadcastTimeout" is the config you can use:
https://github.com/apache/spark/blob/fe07521c9efd9ce0913eee0d42b0ffd98b1225ec/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L863

Thanks,
Terry

On Mon, Jul 20, 2020 at 11:20 AM Amit Sharma  wrote:

> Please help on this.
>
>
> Thanks
> Amit
>
> On Fri, Jul 17, 2020 at 9:10 AM Amit Sharma  wrote:
>
>> Hi, sometimes my spark streaming job throw this exception  Futures timed
>> out after [300 seconds].
>> I am not sure where is the default timeout configuration. Can i increase
>> it. Please help.
>>
>>
>>
>> Thanks
>> Amit
>>
>>
>>
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [300 seconds]
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at
>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>> at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>> at
>> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
>> at
>> org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
>> at
>> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
>> at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101)
>> at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
>> at
>> org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200)
>> at
>> org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
>> at
>> org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68)
>>
>


Announcing .NET for Apache Spark™ 0.12

2020-07-02 Thread Terry Kim
We are happy to announce that .NET for Apache Spark™ v0.12 has been released
<https://github.com/dotnet/spark/releases>! Thanks to the community for the
great feedback. The release note
<https://github.com/dotnet/spark/blob/master/docs/release-notes/0.12/release-0.12.md>
includes the full list of features/improvements of this release.

Here are the some of the highlights:

   - Ability to write UDFs using complex types such as Row, Array, Map,
   Date, Timestamp, etc.
   - Ability to write UDFs using .NET DataFrame
   <https://devblogs.microsoft.com/dotnet/an-introduction-to-dataframe/>
   (backed by Apache Arrow)
   - Enhanced structured streaming support with ForeachBatch/Foreach APIs
   - .NET binding for Delta Lake <https://github.com/delta-io/delta> v0.6
   and Hyperspace <https://github.com/microsoft/hyperspace> v0.1
   - Support for Apache Spark™ 2.4.6 (3.0 support is on the way!)
   - SparkSession.CreateDataFrame, Broadcast variable
   - Preliminary support for MLLib (TF-IDF, Word2Vec, Bucketizer, etc.)
   - Support for .NET Core 3.1

We would like to thank all those who contributed to this release.

Thanks,
Terry Kim on behalf of the .NET for Apache Spark™ team


Hyperspace v0.1 is now open-sourced!

2020-07-02 Thread Terry Kim
Hi all,

We are happy to announce the open-sourcing of Hyperspace v0.1, an indexing
subsystem for Apache Spark™:

   - Code: https://github.com/microsoft/hyperspace
   - Blog Article: https://aka.ms/hyperspace-blog
   - Spark Summit Talk:
   
https://databricks.com/session_na20/hyperspace-an-indexing-subsystem-for-apache-spark
   - Docs: https://aka.ms/hyperspace

This project would not have been possible without the outstanding work from
the Apache Spark™ community. Thank you everyone and we look forward to
collaborating with the community towards evolving Hyperspace.

Thanks,
Terry Kim on behalf of the Hyperspace team


Re: Using existing distribution for join when subset of keys

2020-05-31 Thread Terry Kim
Is the following what you trying to do?

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("x", "y")
val df2 = (0 until 100).map(i => (i % 7, i % 11)).toDF("x", "y")
df1.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t1")
df2.write.format("parquet").bucketBy(8, "x", "y").saveAsTable("t2")
val t1 = spark.table("t1")
val t2 = spark.table("t2")
val joined = t1.join(t2, Seq("x", "y"))
joined.explain

I see no exchange:

== Physical Plan ==
*(3) Project [x#342, y#343]
+- *(3) SortMergeJoin [x#342, y#343], [x#346, y#347], Inner
   :- *(1) Sort [x#342 ASC NULLS FIRST, y#343 ASC NULLS FIRST], false, 0
   :  +- *(1) Project [x#342, y#343]
   : +- *(1) Filter (isnotnull(x#342) && isnotnull(y#343))
   :+- *(1) FileScan parquet default.t1[x#342,y#343] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct, SelectedBucketsCount: 8 out of 8
   +- *(2) Sort [x#346 ASC NULLS FIRST, y#347 ASC NULLS FIRST], false, 0
  +- *(2) Project [x#346, y#347]
 +- *(2) Filter (isnotnull(x#346) && isnotnull(y#347))
+- *(2) FileScan parquet default.t2[x#346,y#347] Batched: true,
Format: Parquet, Location: InMemoryFileIndex[file:/], PartitionFilters: [],
PushedFilters: [IsNotNull(x), IsNotNull(y)], ReadSchema:
struct, SelectedBucketsCount: 8 out of 8

On Sun, May 31, 2020 at 2:38 PM Patrick Woody 
wrote:

> Hey Terry,
>
> Thanks for the response! I'm not sure that it ends up working though - the
> bucketing still seems to require the exchange before the join. Both tables
> below are saved bucketed by "x":
> *(5) Project [x#29, y#30, z#31, z#37]
> +- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
>:- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
> *   :  +- Exchange hashpartitioning(x#29, y#30, 200)*
>: +- *(1) Project [x#29, y#30, z#31]
>:+- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
>:   +- *(1) FileScan parquet default.ax[x#29,y#30,z#31]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct, SelectedBucketsCount: 200 out of 200
>+- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
> *  +- Exchange hashpartitioning(x#35, y#36, 200)*
>  +- *(3) Project [x#35, y#36, z#37]
> +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
>+- *(3) FileScan parquet default.bx[x#35,y#36,z#37]
> Batched: true, Format: Parquet, Location:
> InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
> PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
> ReadSchema: struct, SelectedBucketsCount: 200 out of 200
>
> Best,
> Pat
>
>
>
> On Sun, May 31, 2020 at 3:15 PM Terry Kim  wrote:
>
>> You can use bucketBy to avoid shuffling in your scenario. This test suite
>> has some examples:
>> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>>
>> Thanks,
>> Terry
>>
>> On Sun, May 31, 2020 at 7:43 AM Patrick Woody 
>> wrote:
>>
>>> Hey all,
>>>
>>> I have one large table, A, and two medium sized tables, B & C, that I'm
>>> trying to complete a join on efficiently. The result is multiplicative on A
>>> join B, so I'd like to avoid shuffling that result. For this example, let's
>>> just assume each table has three columns, x, y, z. The below is all being
>>> tested on Spark 2.4.5 locally.
>>>
>>> I'd like to perform the following join:
>>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>>> This outputs the following physical plan:
>>> == Physical Plan ==
>>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>>>: +- *(3) Project [x#32, y#33, z#34, z#74]
>>>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS
>>> FIRST], false, 0
>>>  

Re: Using existing distribution for join when subset of keys

2020-05-31 Thread Terry Kim
You can use bucketBy to avoid shuffling in your scenario. This test suite
has some examples:
https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343

Thanks,
Terry

On Sun, May 31, 2020 at 7:43 AM Patrick Woody 
wrote:

> Hey all,
>
> I have one large table, A, and two medium sized tables, B & C, that I'm
> trying to complete a join on efficiently. The result is multiplicative on A
> join B, so I'd like to avoid shuffling that result. For this example, let's
> just assume each table has three columns, x, y, z. The below is all being
> tested on Spark 2.4.5 locally.
>
> I'd like to perform the following join:
> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
> This outputs the following physical plan:
> == Physical Plan ==
> *(6) Project [x#32, z#34, y#33, z#74, y#53]
> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>: +- *(3) Project [x#32, y#33, z#34, z#74]
>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
> false, 0
>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>:   : +- LocalTableScan [x#32, y#33, z#34]
>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
> false, 0
>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>: +- LocalTableScan [x#72, y#73, z#74]
>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(x#52, z#54, 200)
>  +- LocalTableScan [x#52, y#53, z#54]
>
>
> I may be misremembering, but in the past I thought you had the ability to
> pre-partition each table by "x" and it would satisfy the requirements of
> the join since it is already clustered by the key on both sides using the
> same hash function (this assumes numPartitions lines up obviously). However
> it seems like it will insert another exchange:
>
> A.repartition($"x").join(B.repartition($"x"), Seq("x",
> "y")).join(C.repartition($"x"), Seq("x", "z"))
> *(6) Project [x#32, z#34, y#33, z#74, y#53]
> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>:- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(x#32, z#34, 200)
>: +- *(3) Project [x#32, y#33, z#34, z#74]
>:+- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>:   :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
> false, 0
>:   :  +- Exchange hashpartitioning(x#32, y#33, 200)
>:   : +- Exchange hashpartitioning(x#32, 200)
>:   :+- LocalTableScan [x#32, y#33, z#34]
>:   +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
> false, 0
>:  +- Exchange hashpartitioning(x#72, y#73, 200)
>: +- Exchange hashpartitioning(x#72, 200)
>:+- LocalTableScan [x#72, y#73, z#74]
>+- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(x#52, z#54, 200)
>  +- ReusedExchange [x#52, y#53, z#54], Exchange
> hashpartitioning(x#32, 200).
>
> Note, that using this "strategy" with groupBy("x", "y") works fine though
> I assume that is because it doesn't have to consider the other side of the
> join.
>
> Did this used to work or am I simply confusing it with groupBy? Either way
> - any thoughts on how I can avoid shuffling the bulk of the join result?
>
> Thanks,
> Pat
>
>
>
>
>


Re: [Spark SQL]: Does namespace name is always needed in a query for tables from a user defined catalog plugin

2019-12-01 Thread Terry Kim
Hi Xufei,
I also noticed the same while looking into relation resolution behavior
(See Appendix A in this doc
<https://docs.google.com/document/d/1hvLjGA8y_W_hhilpngXVub1Ebv8RsMap986nENCFnrg/edit?usp=sharing>).
I created SPARK-30094 <https://issues.apache.org/jira/browse/SPARK-30094> and
will follow up.

Thanks,
Terry

On Sun, Dec 1, 2019 at 7:12 PM xufei  wrote:

> Hi,
>
> I'm trying to write a catalog plugin based on spark-3.0-preview,  and I
> found even when I use 'use catalog.namespace' to set the current catalog
> and namespace, I still need to qualified name in the query.
>
> For example, I add a catalog named 'example_catalog', there is a database
> named 'test' in 'example_catalog', and a table 't' in
> 'example_catalog.test'. I can query the table using 'select * from
> example_catalog.test.t' under default catalog(which is spark_catalog).
> After I use 'use example_catalog.test' to change the current catalog to
> 'example_catalog', and the current namespace to 'test', I can query the
> table using 'select * from test.t', but 'select * from t' failed due to
> table_not_found exception.
>
> I want to know if this is an expected behavior?  If yes, it sounds a
> little weird since I think after 'use example_catalog.test', all the
> un-qualified identifiers should be interpreted as
> 'example_catalog.test.identifier'.
>
> Attachment is a test file that you can use to reproduce the problem I met.
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Announcing .NET for Apache Spark 0.5.0

2019-09-30 Thread Terry Kim
We are thrilled to announce that .NET for Apache Spark 0.5.0 has been just
released <https://github.com/dotnet/spark/releases/tag/v0.5.0>!



Some of the highlights of this release include:

   - Delta Lake <https://github.com/delta-io/delta>'s *DeltaTable *APIs
   - UDF improvements
   - Support for Spark 2.3.4/2.4.4

The release notes
<https://github.com/dotnet/spark/blob/master/docs/release-notes/0.5/release-0.5.md>
include
the full list of features/improvements of this release.



We would like to thank all those who contributed to this release.



Thanks,

Terry


Re: Release Apache Spark 2.4.4

2019-08-13 Thread Terry Kim
Can the following be included?

[SPARK-27234][SS][PYTHON] Use InheritableThreadLocal for current epoch in
EpochTracker (to support Python UDFs)
<https://github.com/apache/spark/pull/24946>

Thanks,
Terry

On Tue, Aug 13, 2019 at 10:24 PM Wenchen Fan  wrote:

> +1
>
> On Wed, Aug 14, 2019 at 12:52 PM Holden Karau 
> wrote:
>
>> +1
>> Does anyone have any critical fixes they’d like to see in 2.4.4?
>>
>> On Tue, Aug 13, 2019 at 5:22 PM Sean Owen  wrote:
>>
>>> Seems fine to me if there are enough valuable fixes to justify another
>>> release. If there are any other important fixes imminent, it's fine to
>>> wait for those.
>>>
>>>
>>> On Tue, Aug 13, 2019 at 6:16 PM Dongjoon Hyun 
>>> wrote:
>>> >
>>> > Hi, All.
>>> >
>>> > Spark 2.4.3 was released three months ago (8th May).
>>> > As of today (13th August), there are 112 commits (75 JIRAs) in
>>> `branch-24` since 2.4.3.
>>> >
>>> > It would be great if we can have Spark 2.4.4.
>>> > Shall we start `2.4.4 RC1` next Monday (19th August)?
>>> >
>>> > Last time, there was a request for K8s issue and now I'm waiting for
>>> SPARK-27900.
>>> > Please let me know if there is another issue.
>>> >
>>> > Thanks,
>>> > Dongjoon.
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>


Announcing .NET for Apache Spark 0.4.0

2019-07-31 Thread Terry Kim
We are thrilled to announce that .NET for Apache Spark 0.4.0 has been just
released <https://github.com/dotnet/spark/releases/tag/v0.4.0>!



Some of the highlights of this release include:

   - Apache Arrow backed UDFs (Vector UDF, Grouped Map UDF)
   - Robust UDF-related assembly loading
   - Local UDF debugging



The release notes
<https://github.com/dotnet/spark/blob/master/docs/release-notes/0.4/release-0.4.md>
include the full list of features/improvements of this release.



We would like to thank all those who contributed to this release.



Thanks,

Terry


The last successful batch before stop re-execute after restart the DStreams with checkpoint

2018-03-11 Thread Terry Hoo
Experts,

I see the last batch before stop (graceful shutdown) always re-execute
after restart the DStream from a checkpoint, is this a expected behavior?

I see a bug in JIRA: https://issues.apache.org/jira/browse/SPARK-20050,
whic reports duplicates on Kafka, I also see this with HDFS file.

Regards
- Terry


Re: Getting memory error when starting spark shell but not often

2016-09-06 Thread Terry Hoo
Maybe not enough continues memory (10G?) in your host

Regards,
- Terry

On Wed, Sep 7, 2016 at 10:51 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Hi,
> I am using EMR 4.7 with Spark 1.6
> Sometimes when I start the spark shell I get below error
>
> OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x0005662c,
>> 10632822784, 0) failed; error='Cannot allocate memory' (errno=12)
>> #
>> # There is insufficient memory for the Java Runtime Environment to
>> continue.
>> # Native memory allocation (malloc) failed to allocate 10632822784 bytes
>> for committing reserved memory.
>> # An error report file with more information is saved as:
>> # /tmp/jvm-6066/hs_error.log
>
>
>
> Has any body encountered this kind of issue .
> Would really appreciate the resolution.
>
>
> Thanks,
> Divya
>


Re: spark2.0 how to use sparksession and StreamingContext same time

2016-07-25 Thread Terry Hoo
Kevin,

Try to create the StreamingContext as following:

val ssc = new StreamingContext(spark.sparkContext, Seconds(2))



On Tue, Jul 26, 2016 at 11:25 AM, kevin  wrote:

> hi,all:
> I want to read data from kafka and regist as a table then join a jdbc
> table.
> My sample like this :
>
> val spark = SparkSession
>   .builder
>   .config(sparkConf)
>   .getOrCreate()
>
> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
> "jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
> "dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
> jdbcDF.cache().createOrReplaceTempView("black_book")
>   val df = spark.sql("select * from black_book")
>   df.show()
>
> val ssc = new StreamingContext(sparkConf, Seconds(2))
> ssc.checkpoint("checkpoint")
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(_._2)
> val words = lines.flatMap(_.split(" "))
>
> *I got error :*
>
> 16/07/26 11:18:07 WARN AbstractHandler: No Server set for
> org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
> ++++
> |  id|username|password|
> ++++
> |e6faca36-8766-4dc...|   a|   a|
> |699285a3-a108-457...|   admin| 123|
> |e734752d-ac98-483...|test|test|
> |c0245226-128d-487...|   test2|   test2|
> |4f1bbdb2-89d1-4cc...| 119| 911|
> |16a9a360-13ee-4b5...|1215|1215|
> |bf7d6a0d-2949-4c3...|   demo3|   demo3|
> |de30747c-c466-404...| why| why|
> |644741c9-8fd7-4a5...|   scala|   p|
> |cda1e44d-af4b-461...| 123| 231|
> |6e409ed9-c09b-4e7...| 798|  23|
> ++++
>
> Exception in thread "main" org.apache.spark.SparkException: Only one
> SparkContext may be running in this JVM (see SPARK-2243). To ignore this
> error, set spark.driver.allowMultipleContexts = true. The currently running
> SparkContext was created at:
>
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
> main.POC$.main(POC.scala:43)
> main.POC.main(POC.scala)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
> at
> org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
> at org.apache.spark.SparkContext.(SparkContext.scala:91)
> at
> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:84)
> at main.POC$.main(POC.scala:50)
> at main.POC.main(POC.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>


Re: Another problem about parallel computing

2016-06-13 Thread Terry Hoo
hero,

Did you check whether there is any exception after retry? If the port is 0,
the spark worker should bind to a random port. BTW, what's the spark
version?

Regards,

- Terry

On Mon, Jun 13, 2016 at 4:24 PM, hero <super_big_h...@sina.com> wrote:

> Hi, guys
>
> I have another problem about spark yarn.
> Today, i was running start-all.sh when i found only two worker in the Web
> Ui, and in fact, I have four nodes.
> The only display of the two nodes, one is master, one is slave2.
> the '/etc/hosts' file is show below:
>
> *127.0.0.1   localhost*
> *169.254.9.148   master*
> *169.254.142.119 s1*
> *169.254.180.3   s2*
> *169.254.250.67  s3*
>
>
> So, I'm going to see the spark log on slave1 and slave3.
> The slave2 log is show below:
>
> 16/06/13 03:38:00 INFO util.Utils: Successfully started service
> 'sparkWorker' on port 39887.
>
> And the slave1 or slave3 log is same:
>
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
> 16/06/13 03:38:00 WARN util.Utils: Service 'sparkWorker' could not bind on
> port 0. Attempting port 1.
>
> So, I think the problem lies in this util.Utils.
> I search through Google, but there is no answer to this problem.
> Now how to bind the sparkWorker port?
>
> Thanks
> Jay
>


Re: StackOverflow in Spark

2016-06-13 Thread Terry Hoo
Maybe the same issue with SPARK_6847
<https://issues.apache.org/jira/browse/SPARK-6847>, which has been fixed in
spark 2.0

Regards
- Terry

On Mon, Jun 13, 2016 at 3:15 PM, Michel Hubert <mich...@phact.nl> wrote:

>
>
> I’ve found my problem.
>
>
>
> I’ve got a DAG with two consecutive “updateStateByKey” functions .
>
> When I only process (map/foreachRDD/JavaEsSpark) the state of the last
> “updateStateByKey” function, I get an stackoverflow after a while (too long
> linage).
>
>
>
> But when I also do some processing (foreachRDD/rdd.take) on the first
> “updatestatebykey”, then there is no problem.
>
>
>
> Does this make sense? Probably the “long linage” problem.
>
>
>
> But why should I have such a “linage problem” when Sparks claims to be a
> “abstract/high level” architecture? Why should I be worried about “long
> linage”? Its seems a contradiction with the abstract/high level (functional
> programming) approach when I have to know/consider how Spark doest it.
>
>
>
>
>
>
>
> *Van:* Rishabh Wadhawan [mailto:rishabh...@gmail.com]
> *Verzonden:* donderdag 2 juni 2016 06:06
> *Aan:* Yash Sharma <yash...@gmail.com>
> *CC:* Ted Yu <yuzhih...@gmail.com>; Matthew Young <taige...@gmail.com>;
> Michel Hubert <mich...@phact.nl>; user@spark.apache.org
> *Onderwerp:* Re: StackOverflow in Spark
>
>
>
> Stackoverflow is generated when DAG is too log as there are many
> transformations in lot of iterations. Please use checkpointing to store the
> DAG and break the linage to get away from this stack overflow error. Look
> into checkpoint fuction.
>
> Thanks
>
> Hope it helps. Let me know if you need anymore help.
>
> On Jun 1, 2016, at 8:18 PM, Yash Sharma <yash...@gmail.com> wrote:
>
>
>
> Not sure if its related, But I got a similar stack overflow error some
> time back while reading files and converting them to parquet.
>
>
>
>
>
>
> Stack trace-
> 16/06/02 02:23:54 INFO YarnAllocator: Driver requested a total number of
> 32769 executor(s).
> 16/06/02 02:23:54 INFO ExecutorAllocationManager: Requesting 16384 new
> executors because tasks are backlogged (new desired total will be 32769)
> 16/06/02 02:23:54 INFO YarnAllocator: Will request 24576 executor
> containers, each with 5 cores and 22528 MB memory including 2048 MB overhead
> 16/06/02 02:23:55 WARN ApplicationMaster: Reporter thread fails 2 time(s)
> in a row.
> at
> scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
> at
> scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:24)
> at
> scala.collection.TraversableLike$$anonfun$filter$1.apply(TraversableLike.scala:264)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
> at
> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>
> java.lang.StackOverflowError
>
>
>
>
>
> On Thu, Jun 2, 2016 at 12:58 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
> Looking at Michel's stack trace, it seems to be different issue.
>
>
> On Jun 1, 2016, at 7:45 PM, Matthew Young <taige...@gmail.com> wrote:
>
> Hi,
>
>
>
> It's related to the one fixed bug in Spark, jira ticket SPARK-6847
> <https://issues.apache.org/jira/browse/SPARK-6847>
>
>
>
> Matthew Yang
>
>
>
> On Wed, May 25, 2016 at 7:48 PM, Michel Hubert <mich...@phact.nl> wrote:
>
>
>
> Hi,
>
>
>
>
>
> I have an Spark application which generates StackOverflowError exceptions
> after 30+ min.
>
>
>
> Anyone any ideas?
>
>
>
>
>
>
>
>
>
>
>
>
>
> 16/05/25 10:48:51 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 55449.0 (TID 5584, host81440-cld.opentsp.com):
> java.lang.StackOverflowError
>
> ·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
>
> ·at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>
> ·at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>
> ·at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>
> ·at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>
> ·at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>
> ·at java.io.ObjectInput

ArrayIndexOutOfBoundsException in model selection via cross-validation sample with spark 1.6.1

2016-05-04 Thread Terry Hoo
$spark$repl$SparkILoop$$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(SparkILoop.scala:677)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$interpretAllFrom$1$$anonfun$apply$mcV$sp$1.apply(SparkILoop.scala:677)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$savingReplayStack(SparkILoop.scala:162)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$interpretAllFrom$1.apply$mcV$sp(SparkILoop.scala:676)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$interpretAllFrom$1.apply(SparkILoop.scala:676)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$interpretAllFrom$1.apply(SparkILoop.scala:676)
at
org.apache.spark.repl.SparkILoop.savingReader(SparkILoop.scala:167)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$interpretAllFrom(SparkILoop.scala:675)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$loadCommand$1.apply(SparkILoop.scala:740)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$loadCommand$1.apply(SparkILoop.scala:739)
at org.apache.spark.repl.SparkILoop.withFile(SparkILoop.scala:733)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$loadCommand(SparkILoop.scala:739)
at
org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$7.apply(SparkILoop.scala:344)
at
org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$7.apply(SparkILoop.scala:344)
at
scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:81)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:809)
at
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Regards

- Terry


Re: Number of batches in the Streaming Statics visualization screen

2016-01-29 Thread Terry Hoo
Yes, the data is stored in driver memory.

Mehdi Ben Haj Abbes <mehdi.ab...@gmail.com>于2016年1月29日星期五 18:13写道:

> Thanks Terry for the quick answer.
>
> I did not tried it.  Lets say I will increase the value to 2, what
> side effect should I expect. In fact in the explanation of the property "How
> many finished batches the Spark UI and status APIs remember before garbage
> collecting." So the data is stored in memory, but the the memory of which
> component ... I imagine the driver ?
>
> regards,
>
> On Fri, Jan 29, 2016 at 10:52 AM, Terry Hoo <hujie.ea...@gmail.com> wrote:
>
>> Hi Mehdi,
>>
>> Do you try a larger value of "spark.streaming.ui.retainedBatches"(default
>> is 1000)?
>>
>> Regards,
>> - Terry
>>
>> On Fri, Jan 29, 2016 at 5:45 PM, Mehdi Ben Haj Abbes <
>> mehdi.ab...@gmail.com> wrote:
>>
>>> Hi folks,
>>>
>>> I have a streaming job running for more than 24 hours. It seems that
>>> there is a limit on the number of the batches displayed in the Streaming
>>> Statics visualization screen. For example if I would launch a job Friday I
>>> will not be able to have the statistics for what happened during Saturday.
>>> I will have the batches that have run the previous 24 hours and today it
>>> was like only the previous 3 hours.
>>>
>>> Any help will be very appreciated.
>>> --
>>> Mehdi BEN HAJ ABBES
>>>
>>>
>>
>
>
> --
> Mehdi BEN HAJ ABBES
>
>


Re: Number of batches in the Streaming Statics visualization screen

2016-01-29 Thread Terry Hoo
Hi Mehdi,

Do you try a larger value of "spark.streaming.ui.retainedBatches"(default
is 1000)?

Regards,
- Terry

On Fri, Jan 29, 2016 at 5:45 PM, Mehdi Ben Haj Abbes <mehdi.ab...@gmail.com>
wrote:

> Hi folks,
>
> I have a streaming job running for more than 24 hours. It seems that there
> is a limit on the number of the batches displayed in the Streaming Statics
> visualization screen. For example if I would launch a job Friday I will not
> be able to have the statistics for what happened during Saturday. I will
> have the batches that have run the previous 24 hours and today it was like
> only the previous 3 hours.
>
> Any help will be very appreciated.
> --
> Mehdi BEN HAJ ABBES
>
>


Re: [Spark 1.6][Streaming] About the behavior of mapWithState

2016-01-17 Thread Terry Hoo
Hi Ryan,

Thanks for your comments!

Using reduceByKey() before the mapWithState can get the expected result.

Do we ever consider that mapWithState only outputs the changed key one time
in every batch interval, just like the updateStateByKey. For some cases,
user may only care about the final state.

Regards,
-Terry

On Sat, Jan 16, 2016 at 6:20 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Hey Terry,
>
> That's expected. If you want to only output (1, 3), you can use
> "reduceByKey" before "mapWithState" like this:
>
> dstream.reduceByKey(_ + _).mapWithState(spec)
>
> On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo <hujie.ea...@gmail.com> wrote:
>
>> Hi,
>> I am doing a simple test with mapWithState, and get some events
>> unexpected, is this correct?
>>
>> The test is very simple: sum the value of each key
>>
>> val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
>>   state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
>>   (key, state.get())
>> }
>> val spec = StateSpec.function(mappingFunction)
>> dstream.mapWithState(spec)
>>
>> I create two RDDs and insert into dstream:
>> RDD((1,1), (1,2), (2,1))
>> RDD((1,3))
>>
>> Get result like this:
>> RDD(*(1,1)*, *(1,3)*, (2,1))
>> RDD((1,6))
>>
>> You can see that the first batch will generate two items with the same
>> key "1": (1,1) and (1,3), is this expected behavior? I would expect (1,3)
>> only.
>>
>> Regards
>> - Terry
>>
>
>


[Spark 1.6][Streaming] About the behavior of mapWithState

2016-01-15 Thread Terry Hoo
Hi,
I am doing a simple test with mapWithState, and get some events unexpected,
is this correct?

The test is very simple: sum the value of each key

val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
  state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
  (key, state.get())
}
val spec = StateSpec.function(mappingFunction)
dstream.mapWithState(spec)

I create two RDDs and insert into dstream:
RDD((1,1), (1,2), (2,1))
RDD((1,3))

Get result like this:
RDD(*(1,1)*, *(1,3)*, (2,1))
RDD((1,6))

You can see that the first batch will generate two items with the same key
"1": (1,1) and (1,3), is this expected behavior? I would expect (1,3) only.

Regards
- Terry


[Streaming] Long time to catch up when streaming application restarts from checkpoint

2015-11-06 Thread Terry Hoo
All,

I have a streaming application that monitors a HDFS folder and compute some
metrics based on this data, the data in this folder will be updated by
another uploaded application.

The streaming application's batch interval is 1 minute, batch processing
time of streaming is about 30 seconds, its skeleton is like this:

streamingContext.checkpoint(...)
val fileDStream = streamingContext.textFileStream(hdfs_folder_path)
fileDStream.map(...).reduceByKey().updateStateByKey(...).foreachRDD(rdd => {
 /// send the rdd to outside
})

The streaming application and the uploaded application were stopped at last
Friday and restarted at Monday. We found that the streaming application
takes about one half day to catch up (totally about 3600 batches), since we
do not have any new files at this period, nor do have any window
operations, so these loops do nothing valued.

*Is there any way to skip these batches or to speed up the catch up
processing?*

Thanks!
Terry


[SQL] Memory leak with spark streaming and spark sql in spark 1.5.1

2015-10-14 Thread Terry Hoo
All,

Does anyone meet memory leak issue with spark streaming and spark sql in
spark 1.5.1? I can see the memory is increasing all the time when running
this simple sample:

val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sc, Seconds(1))
val s1 = ssc.socketTextStream("localhost", ).map(x =>
(x,1)).reduceByKey((x : Int, y : Int) => x + y)
s1.print
s1.foreachRDD(rdd => {
  rdd.foreach(_ => Unit)
  sqlContext.createDataFrame(rdd).registerTempTable("A")
  sqlContext.sql("""select * from A""").show(1)
})

After dump the the java heap, I can see there is about 22K entries
in SQLListener._stageIdToStageMetrics after 2 hour running (other maps in
this SQLListener has about 1K entries), is this a leak in SQLListener?

Thanks!
Terry


Re: Streaming Application Unable to get Stream from Kafka

2015-10-09 Thread Terry Hoo
Hi Prateek,

How many cores (threads) do you assign to spark in local mode? It is very
likely the local spark does not have enough resource to proceed. You can
check http://yourip:4040 to check the details.

Thanks!
Terry

On Fri, Oct 9, 2015 at 10:34 PM, Prateek . <prat...@aricent.com> wrote:

> Hi All,
>
>
>
> In my application I have a  serializable class which is taking
> InputDStream from Kafka. The inputDStream contains JSON which is stored in
> serializable case class. Transformations are applied and saveToCassandra()
> is executed.
>
> I was getting task not serializable exception , so I made the class
> serializable.
>
>
>
> Now ,the application is working fine in standalone mode, but not able to
> receive data in local mode with the below mentioned log.
>
>
>
> What is internally happening?, if anyone have some insights Please share!
>
>
>
> Thank You in advance
>
> Regards,
>
> Prateek
>
>
>
>
>
> *From:* Prateek .
> *Sent:* Friday, October 09, 2015 6:55 PM
> *To:* user@spark.apache.org
> *Subject:* Streaming Application Unable to get Stream from Kafka
>
>
>
> Hi,
>
>
>
> I have Spark Streaming application running with the following log on
> console, I don’t get any exception but I am not able to receive the data
> from Kafka Stream.
>
>
>
> Can anyone please provide any insight what is happening with Spark
> Streaming. Is Receiver is not able to read the stream? How shall I debug
> it?
>
>
>
> JobScheduler: Added jobs for time 1444396043000 ms
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(922) called with
> curMem=1009312, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043000 stored as
> bytes in memory (estimated size 922.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043000 in
> memory on webanalytics03:51843 (size: 922.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043000
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043000 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043000
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(926) called with
> curMem=1010234, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043200 stored as
> bytes in memory (estimated size 926.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043200 in
> memory on webanalytics03:51843 (size: 926.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043200
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043200 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043200
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(923) called with
> curMem=1011160, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043400 stored as
> bytes in memory (estimated size 923.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043400 in
> memory on webanalytics03:51843 (size: 923.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043400
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043400 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043400
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(917) called with
> curMem=1012083, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043600 stored as
> bytes in memory (estimated size 917.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043600 in
> memory on webanalytics03:51843 (size: 917.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043600
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043600 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043600
>
> 15/10/09 18:37:24 INFO ReceiverTracker: Stream 0 received 5 blocks
>
> 15/10/09 18:37:24 INFO MemoryStore: ensureFreeSpace(922) called with
> curMem=1013000, maxMem=278302556
>
> 15/10/09 18:37:24 INFO MemoryStore: Block input-0-1444396043800 stored as
> bytes in memory (estimated size 922.0 B, free 264.4 MB)
>
> 15/10/09 18:37:24 INFO BlockManagerInfo: Added input-0-1444396043800 in
> memory 

Re: Cant perform full outer join

2015-09-29 Thread Terry Hoo
Saif,

Might be you can rename one of the dataframe to different name first, then
do an outer join and a select like this:

val cur_d = cur_data.toDF("Date_1", "Value_1")
val r = data.join(cur_d, data("DATE" === cur_d("Date_1",
"outer").select($"DATE", $"VALUE", $"Value_1")

Thanks,
Terry

On Tue, Sep 29, 2015 at 9:56 PM, <saif.a.ell...@wellsfargo.com> wrote:

> Hi all,
>
> So I Have two dataframes, with two columns: DATE and VALUE.
>
> Performing this:
> data = data.join(cur_data, data(“DATE”) === cur_data("DATE"), "outer")
>
> returns
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> Reference 'DATE' is ambiguous, could be: DATE#0, DATE#3.;
>
> But if I change one of the column names, I will get two columns and won’t
> really merge “DATE” column as I wish. Any ideas without going to non
> trivial procedures?
>
> Thanks,
> Saif
>
>


Re: Why Checkpoint is throwing "actor.OneForOneStrategy: NullPointerException"

2015-09-24 Thread Terry Hoo
I met this before: in my program, some DStreams are not initialized since
they are not in the path of  of output.

You can  check if you are the same case.


Thanks!
- Terry

On Fri, Sep 25, 2015 at 10:22 AM, Tathagata Das <t...@databricks.com> wrote:

> Are you by any chance setting DStream.remember() with null?
>
> On Thu, Sep 24, 2015 at 5:02 PM, Uthayan Suthakar <
> uthayan.sutha...@gmail.com> wrote:
>
>> Hello all,
>>
>> My Stream job is throwing below exception at every interval. It is first
>> deleting the the checkpoint file and then it's trying to checkpoint, is
>> this normal behaviour? I'm using Spark 1.3.0. Do you know what may cause
>> this issue?
>>
>> 15/09/24 16:35:55 INFO scheduler.TaskSetManager: Finished task 1.0 in
>> stage 84.0 (TID 799) in 12 ms on itrac1511.cern.ch (1/8)
>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter:
>> Deleting 
>> hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/checkpoint-144310422*
>> *15/09/24 16:35:55 INFO streaming.CheckpointWriter: Checkpoint for time
>> 144310422 ms saved to file
>> 'hdfs://p01001532067275/user/wdtmon/wdt-dstream-6/*
>> checkpoint-144310422', took 10696 bytes and 108 ms
>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Clearing checkpoint data
>> for time 144310422 ms
>> 15/09/24 16:35:55 INFO streaming.DStreamGraph: Cleared checkpoint data
>> for time 144310422 ms
>> 15/09/24 16:35:55 ERROR actor.OneForOneStrategy:
>> java.lang.NullPointerException
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)
>> at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>> at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>> at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>> at
>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>> at
>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:168)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:279)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> Cheers,
>>
>> Uthay
>>
>>
>


Re: How to convert dataframe to a nested StructType schema

2015-09-15 Thread Terry Hole
Hao,

For spark 1.4.1, you can try this:
val rowrdd = df.rdd.map(r => Row(Row(r(3)), Row(r(0), r(1), r(2
val newDF = sqlContext.createDataFrame(rowrdd, yourNewSchema)

Thanks!

- Terry

On Wed, Sep 16, 2015 at 2:10 AM, Hao Wang <billhao.l...@gmail.com> wrote:

> Hi,
>
> I created a dataframe with 4 string columns (city, state, country,
> zipcode).
> I then applied the following nested schema to it by creating a custom
> StructType. When I run df.take(5), it gives the exception below as
> expected.
> The question is how I can convert the Rows in the dataframe to conform to
> this nested schema? Thanks!
>
> root
>  |-- ZipCode: struct (nullable = true)
>  ||-- zip: string (nullable = true)
>  |-- Address: struct (nullable = true)
>  ||-- city: string (nullable = true)
>  ||-- state: string (nullable = true)
>  ||-- country: string (nullable = true)
>
> [info]   org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class
> java.lang.String)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348)
> [info] at
>
> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180)
> [info] at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
> [info] at
> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-09 Thread Terry Hole
Sean,

Thank you!

Finally, I get this to work, although it is a bit ugly: manually to set the
meta data of dataframe.

import org.apache.spark.ml.attribute._
import org.apache.spark.sql.types._
val df = training.toDF()
val schema = df.schema
val rowRDD = df.rdd
def enrich(m : Metadata) : Metadata = {
  val na = NominalAttribute.defaultAttr.withValues("0", "1")
  na.toMetadata(m)
}
val newSchema = StructType(schema.map(f => if (f.name == "label")
f.copy(metadata=enrich(f.metadata)) else f))
val model = pipeline.fit(sqlContext.createDataFrame(rowRDD, newSchema))

Thanks!
- Terry

On Mon, Sep 7, 2015 at 4:24 PM, Sean Owen <so...@cloudera.com> wrote:

> Hm, off the top of my head I don't know. I haven't looked at this
> aspect in a while, strangely. It's an attribute in the metadata of the
> field. I assume there's a method for setting this metadata when you
> construct the input data.
>
> On Sun, Sep 6, 2015 at 10:41 AM, Terry Hole <hujie.ea...@gmail.com> wrote:
> > Sean
> >
> > Do you know how to tell decision tree that the "label" is a binary or set
> > some attributes to dataframe to carry number of classes?
> >
> > Thanks!
> > - Terry
> >
> > On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> (Sean)
> >> The error suggests that the type is not a binary or nominal attribute
> >> though. I think that's the missing step. A double-valued column need
> >> not be one of these attribute types.
> >>
> >> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com>
> wrote:
> >> > Hi, Owen,
> >> >
> >> > The dataframe "training" is from a RDD of case class:
> >> > RDD[LabeledDocument],
> >> > while the case class is defined as this:
> >> > case class LabeledDocument(id: Long, text: String, label: Double)
> >> >
> >> > So there is already has the default "label" column with "double" type.
> >> >
> >> > I already tried to set the label column for decision tree as this:
> >> > val lr = new
> >> >
> >> >
> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
> >> > It raised the same error.
> >> >
> >> > I also tried to change the "label" to "int" type, it also reported
> error
> >> > like following stack, I have no idea how to make this work.
> >> >
> >> > java.lang.IllegalArgumentException: requirement failed: Column label
> >> > must be
> >> > of type DoubleType but was actually IntegerType.
> >> > at scala.Predef$.require(Predef.scala:233)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
> >> > at
> >> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> > at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> >> > at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> >> > at
> >> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
> >> > at
> >> > org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
> >> > at
> >> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
> >> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
> >> > at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> >> > at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
> >> > at
> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iw

Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-07 Thread Terry Hole
Xiangrui,

Do you have any idea how to make this work?

Thanks
- Terry

Terry Hole <hujie.ea...@gmail.com>于2015年9月6日星期日 17:41写道:

> Sean
>
> Do you know how to tell decision tree that the "label" is a binary or set
> some attributes to dataframe to carry number of classes?
>
> Thanks!
> - Terry
>
> On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> (Sean)
>> The error suggests that the type is not a binary or nominal attribute
>> though. I think that's the missing step. A double-valued column need
>> not be one of these attribute types.
>>
>> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com>
>> wrote:
>> > Hi, Owen,
>> >
>> > The dataframe "training" is from a RDD of case class:
>> RDD[LabeledDocument],
>> > while the case class is defined as this:
>> > case class LabeledDocument(id: Long, text: String, label: Double)
>> >
>> > So there is already has the default "label" column with "double" type.
>> >
>> > I already tried to set the label column for decision tree as this:
>> > val lr = new
>> >
>> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
>> > It raised the same error.
>> >
>> > I also tried to change the "label" to "int" type, it also reported error
>> > like following stack, I have no idea how to make this work.
>> >
>> > java.lang.IllegalArgumentException: requirement failed: Column label
>> must be
>> > of type DoubleType but was actually IntegerType.
>> > at scala.Predef$.require(Predef.scala:233)
>> > at
>> >
>> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
>> > at
>> >
>> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
>> > at
>> >
>> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
>> > at
>> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
>> > at
>> >
>> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
>> > at
>> >
>> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
>> > at
>> >
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>> > at
>> >
>> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
>> > at
>> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
>> > at
>> org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
>> > at
>> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
>> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
>> > at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
>> > at
>> >
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
>> > at
>> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
>> > at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66)
>> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:68)
>> > at $iwC$$iwC$$iwC$$iwC.(:70)
>> > at $iwC$$iwC$$iwC.(:72)
>> > at $iwC$$iwC.(:74)
>> > at $iwC.(:76)
>> > at (:78)
>> > at .(:82)
>> > at .()
>> > at .(:7)
>> > at .()
>> > at $print()
>> >
>> > Thanks!
>> > - Terry
>> >
>> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> I think somewhere alone the line you've not specified your label
>> >> column -- it's defaulting to "label" and it does not recognize it, or
>> >> at least not as a binary or nominal attribute.
>> >>
>> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com>
>> wrote:
>> >> > Hi, Experts,
>> >> >
>> >> > I followed the guide of spark ml pipe to test DecisionTr

Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-06 Thread Terry Hole
Hi, Owen,

The dataframe "training" is from a RDD of case class: RDD[LabeledDocument],
while the case class is defined as this:
case class LabeledDocument(id: Long, text: String, *label: Double*)

So there is already has the default "label" column with "double" type.

I already tried to set the label column for decision tree as this:
val lr = new
DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
It raised the same error.

I also tried to change the "label" to "int" type, it also reported error
like following stack, I have no idea how to make this work.

java.lang.IllegalArgumentException: requirement failed: *Column label must
be of type DoubleType but was actually IntegerType*.
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
at
org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
at
org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at
org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
at
org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at
scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
at
org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:68)
at $iwC$$iwC$$iwC$$iwC.(:70)
at $iwC$$iwC$$iwC.(:72)
at $iwC$$iwC.(:74)
at $iwC.(:76)
at (:78)
at .(:82)
at .()
at .(:7)
at .()
at $print()

Thanks!
- Terry

On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote:

> I think somewhere alone the line you've not specified your label
> column -- it's defaulting to "label" and it does not recognize it, or
> at least not as a binary or nominal attribute.
>
> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com> wrote:
> > Hi, Experts,
> >
> > I followed the guide of spark ml pipe to test DecisionTreeClassifier on
> > spark shell with spark 1.4.1, but always meets error like following, do
> you
> > have any idea how to fix this?
> >
> > The error stack:
> > java.lang.IllegalArgumentException: DecisionTreeClassifier was given
> input
> > with invalid label column label, without the number of classes specified.
> > See StringIndexer.
> > at
> >
> org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:71)
> > at
> >
> org.apache.spark.ml.classification.DecisionTreeClassifier.train(DecisionTreeClassifier.scala:41)
> > at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
> > at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
> > at
> > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:133)
> > at
> > org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:129)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> > at
> >
> scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:42)
> > at
> >
> scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:43)
> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:129)
> > at
> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:42)
> > at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:55)
> > at $iwC$$iwC$$iwC$$iwC.(:57

Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-06 Thread Terry Hole
Sean

Do you know how to tell decision tree that the "label" is a binary or set
some attributes to dataframe to carry number of classes?

Thanks!
- Terry

On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen <so...@cloudera.com> wrote:

> (Sean)
> The error suggests that the type is not a binary or nominal attribute
> though. I think that's the missing step. A double-valued column need
> not be one of these attribute types.
>
> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole <hujie.ea...@gmail.com> wrote:
> > Hi, Owen,
> >
> > The dataframe "training" is from a RDD of case class:
> RDD[LabeledDocument],
> > while the case class is defined as this:
> > case class LabeledDocument(id: Long, text: String, label: Double)
> >
> > So there is already has the default "label" column with "double" type.
> >
> > I already tried to set the label column for decision tree as this:
> > val lr = new
> >
> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
> > It raised the same error.
> >
> > I also tried to change the "label" to "int" type, it also reported error
> > like following stack, I have no idea how to make this work.
> >
> > java.lang.IllegalArgumentException: requirement failed: Column label
> must be
> > of type DoubleType but was actually IntegerType.
> > at scala.Predef$.require(Predef.scala:233)
> > at
> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
> > at
> >
> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
> > at
> >
> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
> > at
> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
> > at
> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> > at
> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> > at
> >
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> > at
> >
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> > at
> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
> > at
> org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
> > at
> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
> > at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> > at
> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
> > at
> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
> > at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66)
> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:68)
> > at $iwC$$iwC$$iwC$$iwC.(:70)
> > at $iwC$$iwC$$iwC.(:72)
> > at $iwC$$iwC.(:74)
> > at $iwC.(:76)
> > at (:78)
> > at .(:82)
> > at .()
> > at .(:7)
> > at .()
> > at $print()
> >
> > Thanks!
> > - Terry
> >
> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> I think somewhere alone the line you've not specified your label
> >> column -- it's defaulting to "label" and it does not recognize it, or
> >> at least not as a binary or nominal attribute.
> >>
> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole <hujie.ea...@gmail.com>
> wrote:
> >> > Hi, Experts,
> >> >
> >> > I followed the guide of spark ml pipe to test DecisionTreeClassifier
> on
> >> > spark shell with spark 1.4.1, but always meets error like following,
> do
> >> > you
> >> > have any idea how to fix this?
> >> >
> >> > The error stack:
> >> > java.lang.IllegalArgumentException: DecisionTreeClassifier was given
> >> > input
> >> > with invalid label column label, without the number of classes
> >> > specified.
> >> > See StringIndexer.
> >> > at
> >> >
> >&g

SparkSQL without access to arrays?

2015-09-03 Thread Terry
Hi, 
i'm using Spark 1.4.1.
Here is de printSchema after load my json file:

root
 |-- result: struct (nullable = true)
 ||-- negative_votes: long (nullable = true)
 ||-- players: array (nullable = true)
 ||||-- account_id: long (nullable = true)
 ||||-- assists: long (nullable = true)
 ||||-- deaths: long (nullable = true)
 ||||-- denies: long (nullable = true)
 ||-- positive_votes: long (nullable = true)
 ||-- radiant_captain: long (nullable = true)
 ||-- radiant_win: boolean (nullable = true)

Why is not possible to do:


There is an alternative for this work?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-without-access-to-arrays-tp24572.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Job aborted due to stage failure: java.lang.StringIndexOutOfBoundsException: String index out of range: 18

2015-08-28 Thread Terry Hole
Ricky,


You may need to use map instead of flatMap in your case

*val rowRDD=sc.textFile(/user/spark/short_model).map(_.split(\\t)).map(p
= Row(...))*


Thanks!

-Terry


On Fri, Aug 28, 2015 at 5:08 PM, our...@cnsuning.com our...@cnsuning.com
wrote:

 hi all,

 when using  spark sql ,A problem bothering me.

 the codeing as following:

  *val schemaString =
 visitor_id,cust_num,gds_id,l1_gds_group_cd,l4_gds_group_cd,pc_gds_addcart,pc_gds_collect_num,pc_gds_four_page_pv,pc_gds_four_page_time,pc_gds_four_page_fromsearch_pv,pc_gds_four_page_fromlist_pv,pc_gds_four_page_fromrec_pv,pc_gds_four_page_fromcuxiao_pv,pc_four_page_num,pc_group_gds_addcart,pc_group_gds_collect,pc_group_fourpage_pv,pc_group_fourpage_time,pc_visitor_pv,pc_search_pv,pc_list_pv,pc_is_view1,pc_is_view,pc_view_cycle_days,pc_view_days,wap_gds_addcart,wap_gds_collect_num,wap_gds_four_page_pv,wap_gds_four_page_time,wap_gds_four_page_fromsearch_pv,wap_gds_four_page_fromlist_pv,wap_gds_four_page_fromrec_pv,wap_gds_four_page_fromcuxiao_pv,wap_four_page_num,wap_group_gds_addcart,wap_group_gds_collect,wap_group_fourpage_pv,wap_group_fourpage_time,wap_visitor_pv,wap_search_pv,wap_list_pv,wap_is_view1,wap_is_view,wap_view_cycle_days,wap_view_days,app_gds_addcart,app_gds_collect_num,app_gds_four_page_pv,app_gds_four_page_time,app_gds_four_page_fromsearch_pv,app_gds_four_page_fromlist_pv,app_gds_four_page_fromrec_pv,app_gds_four_page_fromcuxiao_pv,app_four_page_num,app_group_gds_addcart,app_group_gds_collect,app_group_fourpage_pv,app_group_fourpage_time,app_visitor_pv,app_search_pv,app_list_pv,app_is_view1,app_is_view,app_view_cycle_days,app_view_days,gds_score_desc,l4_gds_group_rate_n,decision_cycle_days,decision_days,decision_pv,is_order,statis_date*
 //*schemaString.length=72 *

 *import org.apache.spark.sql.Row;*

 *import org.apache.spark.sql.types.{StructType,StructField,StringType};*

 *val schema =StructType( schemaString.split(,).map(fieldName = 
 StructField(fieldName, StringType, true)))*

 *val 
 rowRDD=sc.textFile(/user/spark/short_model).flatMap(_.split(\\t)).map(p 
 = 
 Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71)))*

 *val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)*

 *peopleDataFrame.registerTempTable(alg)*

 *val results = sqlContext.sql(SELECT count(*) FROM alg)*

 *results.collect()*


 the error log as following:

   5/08/28 17:00:54 WARN TaskSetManager: Lost task 56.0 in
 stage 9.0 (TID 71, 10.104.74.8): java.lang.StringIndexOutOfBoundsException:
 String index out of range: 18
 at java.lang.String.charAt(String.java:658)
 at
 scala.collection.immutable.StringOps$.apply$extension(StringOps.scala:39)
 at
 $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:26)
 at
 $line415.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(console:26)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at
 org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:130)
 at
 org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/08/28 17:00:54 INFO TaskSetManager: Starting task 56.1 in stage 9.0
 (TID 72, 10.104.74.8, NODE_LOCAL, 1415 bytes)
 15/08/28 17:00:54 INFO TaskSetManager: Lost task 56.1 in stage 9.0 (TID
 72) on executor 10.104.74.8: java.lang.StringIndexOutOfBoundsException
 (String index out

Re: standalone to connect mysql

2015-07-21 Thread Terry Hole
Jack,

You can refer the hive sql syntax if you use HiveContext:
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML

Thanks!
-Terry

 That works! Thanks.

Can I ask you one further question?

 How did spark sql support insertion?



 That is say, if I did:

 sqlContext.sql(insert into newStu values (“10”,”a”,1)



 the error is:

 failure: ``table'' expected but identifier newStu found

 insert into newStu values ('10', aa, 1)



 but if I did:

 sqlContext.sql(sinsert into Table newStu select * from otherStu)

 that works.



 Is there any document addressing that?





 Best regards,

 Jack





 *From:* Terry Hole [mailto:hujie.ea...@gmail.com]
 *Sent:* Tuesday, 21 July 2015 4:17 PM
 *To:* Jack Yang; user@spark.apache.org
 *Subject:* Re: standalone to connect mysql



 Maybe you can try: spark-submit --class sparkwithscala.SqlApp
  --jars /home/lib/mysql-connector-java-5.1.34.jar --master 
 spark://hadoop1:7077
 /home/myjar.jar



 Thanks!

 -Terry

  Hi there,



 I would like to use spark to access the data in mysql. So firstly  I tried
 to run the program using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar



 that returns me the correct results. Then I tried the standalone version
 using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077
 /home/myjar.jar

 (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.)

 and the error is:



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
 failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129):
 java.sql.SQLException: No suitable driver found for
 jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root



 I also found the similar problem before in
 https://jira.talendforge.org/browse/TBD-2244.



 Is this a bug to be fixed later? Or do I miss anything?







 Best regards,

 Jack






Re: standalone to connect mysql

2015-07-21 Thread Terry Hole
Maybe you can try: spark-submit --class sparkwithscala.SqlApp  --jars
/home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077
/home/myjar.jar

Thanks!
-Terry

  Hi there,



 I would like to use spark to access the data in mysql. So firstly  I tried
 to run the program using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master local[4] /home/myjar.jar



 that returns me the correct results. Then I tried the standalone version
 using:

 spark-submit --class sparkwithscala.SqlApp --driver-class-path
 /home/lib/mysql-connector-java-5.1.34.jar --master spark://hadoop1:7077
 /home/myjar.jar

 (the mysql-connector-java-5.1.34.jar i have them on all worker nodes.)

 and the error is:



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent
 failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.157.129):
 java.sql.SQLException: No suitable driver found for
 jdbc:mysql://hadoop1:3306/sparkMysqlDB?user=rootpassword=root



 I also found the similar problem before in
 https://jira.talendforge.org/browse/TBD-2244.



 Is this a bug to be fixed later? Or do I miss anything?







 Best regards,

 Jack





Re: [Spark Shell] Could the spark shell be reset to the original status?

2015-07-16 Thread Terry Hole
Hi Ted,

Thanks for the information. The post seems little different with my
requirement: suppose we defined different functions to do different
streaming work (e.g. 50 functions), i want to test these 50 functions in
the spark shell, and the shell will always throw OOM at the middle of test
(yes, it could be solved by increasing the jvm memory size, but if we have
more functions, the issue still will happen). The main issue is that the
shell keeps track all the information (class, objects...) from started, so
the java memory will increase time to time when define/invoke the
functions.

Thanks!
- Terry

Ted Yu yuzhih...@gmail.com于2015年7月17日周五 下午12:02写道:

 See this recent thread:


 http://search-hadoop.com/m/q3RTtFW7iMDkrj61/Spark+shell+oom+subj=java+lang+OutOfMemoryError+PermGen+space



 On Jul 16, 2015, at 8:51 PM, Terry Hole hujie.ea...@gmail.com wrote:

 Hi,

 Background: The spark shell will get out of memory error after dealing
 lots of spark work.

 Is there any method which can reset the spark shell to the startup status?
 I tried *:reset*, but it seems not working: i can not create spark
 context anymore (some compile error as below) after the *:reset*. (I
 have to restart the shell after OOM to workaround)

 == Expanded type of tree ==
 TypeRef(TypeSymbol(class $read extends Serializable))
 uncaught exception during compilation: java.lang.AssertionError
 java.lang.AssertionError: assertion failed: Tried to find '$line16' in
 'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63'
 but it is not a directory
 That entry seems to have slain the compiler.  Shall I replayyour session?
 I can re-run each line except the last one.[y/n]
 Abandoning crashed session.

 Thanks!
 -Terry




[Spark Shell] Could the spark shell be reset to the original status?

2015-07-16 Thread Terry Hole
Hi,

Background: The spark shell will get out of memory error after dealing lots
of spark work.

Is there any method which can reset the spark shell to the startup status?
I tried *:reset*, but it seems not working: i can not create spark
context anymore (some compile error as below) after the *:reset*. (I have
to restart the shell after OOM to workaround)

== Expanded type of tree ==
TypeRef(TypeSymbol(class $read extends Serializable))
uncaught exception during compilation: java.lang.AssertionError
java.lang.AssertionError: assertion failed: Tried to find '$line16' in
'C:\Users\jhu\AppData\Local\Temp\spark-2ad09490-c0c6-41e2-addb-63087ce0ae63'
but it is not a directory
That entry seems to have slain the compiler.  Shall I replayyour session? I
can re-run each line except the last one.[y/n]
Abandoning crashed session.

Thanks!
-Terry


Re: fileStream with old files

2015-07-15 Thread Terry Hole
Hi, Hunter,

*What **behavior do you see with the HDFS? The local file system and HDFS
should have the same ** behavior.*

*Thanks!*
*- Terry*

Hunter Morgan hunter.mor...@rackspace.com于2015年7月16日周四 上午2:04写道:

  After moving the setting of the parameter to SparkConf initialization
 instead of after the context is already initialized, I have it operating
 reliably on local filesystem, but not on hdfs. Are there any differences in
 behavior between these two cases I should be aware of?



 I don’t usually mailinglist or exchange, so forgive me for my ignorance of
 whether this message will go horribly wrong due to formatting.



 I plan to port the following code to Hadoop FS API to generalize testing
 to understand actual behavior and ensure desired behavior.

 public static JavaDStreamString 
 textFileStreamIncludingExisting(JavaStreamingContext context, String path)
 {
 return context.fileStream(path, LongWritable
 .class, Text.class, TextInputFormat.class, v1 - true, 
 false).map(v1 - v1._2.toString());
 }



 @Test
 public void testTextFileStreamIncludingExistingReadsOldFiles() throws
 Exception
 {
 final Path testDir = Files.createTempDirectory(sparkTest);
 final ArrayListPath tempFiles = new ArrayList();

 // create 20 old files
 final int testFileNumberLimit = 20;
 for (int testFileNumber = 0; testFileNumber  testFileNumberLimit;
 testFileNumber++)
 {
 final Path testFile = Files.createTempFile(testDir, testFile, 
 );
 tempFiles.add(testFile);
 final FileWriter fileWriter = new FileWriter(testFile.toFile());
 fileWriter.write(asdf);
 fileWriter.flush();
 fileWriter.close();
 for (String eachAttribute : new String[]{basic:lastAccessTime,
 basic:lastModifiedTime,
 basic:creationTime})
 { // set file dates 0 to 20 days ago
 Files.setAttribute(testFile, eachAttribute, FileTime.from(
 Instant.now().minus(Duration.ofDays
 (testFileNumber;
 }
 }

 final SparkConf sparkConf = new SparkConf().setMaster(local[1]).
 setAppName(test);
 sparkConf.set(spark.streaming.minRememberDuration, String.valueOf(
 Integer.MAX_VALUE));
 final JavaStreamingContext context = new JavaStreamingContext(
 sparkConf, Durations.seconds(1));
 final JavaDStreamString input = SparkUtil.
 textFileStreamIncludingExisting(context, String.valueOf(testDir
 .toUri()));
 // count files read
 final AccumulatorInteger accumulator = context.sparkContext().
 accumulator(0);

 // setup async wait
 Semaphore done = new Semaphore(1);
 done.acquire();
 input.foreachRDD(new FunctionJavaRDDString, Void()
 {
 @Override
 public Void call(JavaRDDString v1) throws Exception
 {
 if (v1.count() == 0)
 {
 done.release();
 }
 accumulator.add((int) v1.count());
 return null;
 }
 });
 context.start();
 // wait for completion or 20 sec
 done.tryAcquire(20, TimeUnit.SECONDS);
 context.stop();

 assertThat(accumulator.value(), is(testFileNumberLimit));

 for (Path eachTempFile : tempFiles)
 {
 Files.deleteIfExists(eachTempFile);
 }
 Files.deleteIfExists(testDir);
 }





 *From:* Tathagata Das [mailto:t...@databricks.com]
 *Sent:* Wednesday, July 15, 2015 00:01
 *To:* Terry Hole
 *Cc:* Hunter Morgan; user@spark.apache.org


 *Subject:* Re: fileStream with old files



 It was added, but its not documented publicly. I am planning to change the
 name of the conf to spark.streaming.fileStream.minRememberDuration to make
 it easier to understand



 On Mon, Jul 13, 2015 at 9:43 PM, Terry Hole hujie.ea...@gmail.com wrote:

  A new configuration named *spark.streaming.minRememberDuration* was
 added since 1.2.1 to control the file stream input, the default value is *60
 seconds*, you can change this value to a large value to include older
 files (older than 1 minute)



 You can get the detail from this jira:
 https://issues.apache.org/jira/browse/SPARK-3276



 -Terry



 On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant 
 hunter.mor...@rackspace.com wrote:

 It's not as odd as it sounds. I want to ensure that long streaming job
 outages can recover all the files that went into a directory while the job
 was down.
 I've looked at

 http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
 and

 https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
 , but all seem unhelpful.
 I've tested combinations of the following:
  * fileStreams created with dumb accept-all filters
  * newFilesOnly true and false,
  * tweaking minRememberDuration to high and low values,
  * on hdfs or local

Re: fileStream with old files

2015-07-13 Thread Terry Hole
A new configuration named *spark.streaming.minRememberDuration* was added
since 1.2.1 to control the file stream input, the default value is *60
seconds*, you can change this value to a large value to include older files
(older than 1 minute)

You can get the detail from this jira:
https://issues.apache.org/jira/browse/SPARK-3276

-Terry

On Tue, Jul 14, 2015 at 4:44 AM, automaticgiant hunter.mor...@rackspace.com
 wrote:

 It's not as odd as it sounds. I want to ensure that long streaming job
 outages can recover all the files that went into a directory while the job
 was down.
 I've looked at

 http://apache-spark-user-list.1001560.n3.nabble.com/Generating-a-DStream-by-existing-textfiles-td20030.html#a20039
 and

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-td14306.html#a16435
 and

 https://stackoverflow.com/questions/29022379/spark-streaming-hdfs/29036469#29036469?newreg=e7e25469132d4fbc8350be8f876cf81e
 , but all seem unhelpful.
 I've tested combinations of the following:
  * fileStreams created with dumb accept-all filters
  * newFilesOnly true and false,
  * tweaking minRememberDuration to high and low values,
  * on hdfs or local directory.
 The problem is that it will not read files in the directory from more than
 a
 minute ago.
 JavaPairInputDStreamLongWritable, Text input = context.fileStream(indir,
 LongWritable.class, Text.class, TextInputFormat.class, v - true, false);
 Also tried with having set:
 context.sparkContext().getConf().set(spark.streaming.minRememberDuration,
 1654564); to big/small.

 Are there known limitations of the onlyNewFiles=false? Am I doing something
 wrong?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/fileStream-with-old-files-tp23802.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: [Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?

2015-07-10 Thread Terry Hole
Michael,

Thanks

- Terry

Michael Armbrust mich...@databricks.com于2015年7月11日星期六 04:02写道:

 Metastore configuration should be set in hive-site.xml.

 On Thu, Jul 9, 2015 at 8:59 PM, Terry Hole hujie.ea...@gmail.com wrote:

 Hi,

 I am trying to set the hive metadata destination to a mysql database in
 hive context, it works fine in spark 1.3.1, but it seems broken in spark
 1.4.1-rc1, where it always connect to the default metadata: local), is this
 a regression or we must set the connection in hive-site.xml?

 The code is very simple in spark shell:
* import org.apache.spark.sql.hive._*
 *val hiveContext = new HiveContext(sc)*
 *hiveContext.setConf(javax.jdo.option.ConnectionDriveName,
 com.mysql.jdbc.Driver)*
 *hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)*
 *hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)*
 *hiveContext.setConf(javax.jdo.option.ConnectionURL,
 jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)*
 *hiveContext.setConf(hive.metastore.warehouse.dir,
 /user/hive/warehouse)*
 *hiveContext.sql(select * from mysqltable).show()*

 *Thanks!*
 *-Terry*





[Spark Hive SQL] Set the hive connection in hive context is broken in spark 1.4.1-rc1?

2015-07-09 Thread Terry Hole
Hi,

I am trying to set the hive metadata destination to a mysql database in
hive context, it works fine in spark 1.3.1, but it seems broken in spark
1.4.1-rc1, where it always connect to the default metadata: local), is this
a regression or we must set the connection in hive-site.xml?

The code is very simple in spark shell:
   * import org.apache.spark.sql.hive._*
*val hiveContext = new HiveContext(sc)*
*hiveContext.setConf(javax.jdo.option.ConnectionDriveName,
com.mysql.jdbc.Driver)*
*hiveContext.setConf(javax.jdo.option.ConnectionUserName, hive)*
*hiveContext.setConf(javax.jdo.option.ConnectionPassword, hive)*
*hiveContext.setConf(javax.jdo.option.ConnectionURL,
jdbc:mysql://10.111.3.186:3306/hive http://10.111.3.186:3306/hive)*
*hiveContext.setConf(hive.metastore.warehouse.dir,
/user/hive/warehouse)*
*hiveContext.sql(select * from mysqltable).show()*

*Thanks!*
*-Terry*


Re: Is there a way to shutdown the derby in hive context in spark shell?

2015-07-09 Thread Terry Hole
)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340)
at
org.apache.spark.sql.hive.client.ClientWrapper.init(ClientWrapper.scala:116)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:172)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.init(IsolatedClientLoader.scala:168)
at
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:213)
at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:176)
at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:359)

On Thu, Jul 9, 2015 at 2:32 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try sc.shutdown and creating a new one?

 Thanks
 Best Regards

 On Wed, Jul 8, 2015 at 8:12 PM, Terry Hole hujie.ea...@gmail.com wrote:

 I am using spark 1.4.1rc1 with default hive settings

 Thanks
 - Terry

 Hi All,

 I'd like to use the hive context in spark shell, i need to recreate the
 hive meta database in the same location, so i want to close the derby
 connection previous created in the spark shell, is there any way to do this?

 I try this, but it does not work:

 DriverManager.getConnection(jdbc:derby:;shutdown=true);

 Thanks!

 - Terry






Is there a way to shutdown the derby in hive context in spark shell?

2015-07-08 Thread Terry Hole
Hi All,

I'd like to use the hive context in spark shell, i need to recreate the
hive meta database in the same location, so i want to close the derby
connection previous created in the spark shell, is there any way to do this?

I try this, but it does not work:
DriverManager.getConnection(jdbc:derby:;shutdown=true);

Thanks!
- Terry


Re: Is there a way to shutdown the derby in hive context in spark shell?

2015-07-08 Thread Terry Hole
I am using spark 1.4.1rc1 with default hive settings

Thanks
- Terry

Hi All,

I'd like to use the hive context in spark shell, i need to recreate the
hive meta database in the same location, so i want to close the derby
connection previous created in the spark shell, is there any way to do this?

I try this, but it does not work:

DriverManager.getConnection(jdbc:derby:;shutdown=true);

Thanks!

- Terry


Re: Meets class not found error in spark console with newly hive context

2015-07-02 Thread Terry Hole
Found this a bug in spark 1.4.0: SPARK-8368
https://issues.apache.org/jira/browse/SPARK-8368

Thanks!
Terry

On Thu, Jul 2, 2015 at 1:20 PM, Terry Hole hujie.ea...@gmail.com wrote:

 All,

 I am using spark console 1.4.0 to do some tests, when a create a newly
 HiveContext (Line 18 in the code) in my test function, it always throw
 exception like below (It works in spark console 1.3.0), but if i removed
 the HiveContext (The line 18 in the code) in my function, it works fine.
 Any idea what's wrong with this?

 java.lang.ClassNotFoundException:
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
 iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at
 java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at
 org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos
 ureCleaner.scala:455)
 at
 com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
 at
 com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
 Source)
 at
 org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101)
 at
 org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197)
 at
 org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
 at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
 at
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
 at
 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
 at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
 at
 org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
 at
 org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
 $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(console:98)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
 $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:93)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
 $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:98)



  1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._ 3 
 import org.apache.spark.streaming.{ StreamingContext, Seconds, Minutes, Time 
 } 4 import org.apache.spark.streaming.StreamingContext._ 5 import 
 org.apache.spark.rdd.RDD 6 import org.apache.spark.streaming.dstream.DStream 
 7 import org.apache.spark.HashPartitioner 8 import 
 org.apache.spark.storage.StorageLevel 9 import org.apache.spark.sql._10 
 import org.apache.spark.sql.hive._11 import 
 scala.collection.mutable.{Queue}12 import scala.concurrent.Future13 import 
 scala.concurrent.ExecutionContext.Implicits.global14 15 def 
 streamingTest(args: Array[String]) {16 println( create 
 streamingContext.)17 val ssc = new StreamingContext(sc, Seconds(1))18
  *val sqlContext2 = new HiveContext(sc)*19 20 val accum = 
 sc.accumulator(0, End Accumulator)21 val queue = 
 scala.collection.mutable.Queue(sc.textFile(G:/pipe/source))22 val 
 textSource = ssc.queueStream(queue, true)23 textSource.foreachRDD(rdd = 
 { rdd.foreach( item = {accum += 1} ) })24 textSource.foreachRDD(rdd = 
 {25 var sample = rdd.take(10)26 if 
 (sample.length  0) {27 sample.foreach(item = 
 println(#=  + item))28 }29 })30 
 println( Start streaming context.)31 ssc.start()32 val stopFunc 
 = Future {var isRun = true; var duration = 0; while (isRun) { 
 Thread.sleep(1000); duration += 1; if ( accum.value  0 || duration = 120) 
 {println(### STOP SSC ###);ssc.stop(false, true); duration = 0; isRun = 
 false} }}33 ssc.awaitTermination()34 println( Streaming context 
 terminated.)35 }36 37 streamingTest(null)38

 Thanks
 Terry



Meets class not found error in spark console with newly hive context

2015-07-01 Thread Terry Hole
All,

I am using spark console 1.4.0 to do some tests, when a create a newly
HiveContext (Line 18 in the code) in my test function, it always throw
exception like below (It works in spark console 1.3.0), but if i removed
the HiveContext (The line 18 in the code) in my function, it works fine.
Any idea what's wrong with this?

java.lang.ClassNotFoundException:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$
iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at
java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos
ureCleaner.scala:455)
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source)
at
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown
Source)
at
org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1891)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
at
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
at
org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(console:98)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:93)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC
$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:98)



 1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._
3 import org.apache.spark.streaming.{ StreamingContext, Seconds,
Minutes, Time } 4 import org.apache.spark.streaming.StreamingContext._
5 import org.apache.spark.rdd.RDD 6 import
org.apache.spark.streaming.dstream.DStream 7 import
org.apache.spark.HashPartitioner 8 import
org.apache.spark.storage.StorageLevel 9 import
org.apache.spark.sql._10 import org.apache.spark.sql.hive._11 import
scala.collection.mutable.{Queue}12 import scala.concurrent.Future13
import scala.concurrent.ExecutionContext.Implicits.global14 15 def
streamingTest(args: Array[String]) {16 println( create
streamingContext.)17 val ssc = new StreamingContext(sc,
Seconds(1))18 *val sqlContext2 = new HiveContext(sc)*19 20 val
accum = sc.accumulator(0, End Accumulator)21 val queue =
scala.collection.mutable.Queue(sc.textFile(G:/pipe/source))22
val textSource = ssc.queueStream(queue, true)23
textSource.foreachRDD(rdd = { rdd.foreach( item = {accum += 1} )
})24 textSource.foreachRDD(rdd = {25 var sample =
rdd.take(10)26 if (sample.length  0) {27
   sample.foreach(item = println(#=  + item))28
}29 })30 println( Start streaming context.)31
ssc.start()32 val stopFunc = Future {var isRun = true; var
duration = 0; while (isRun) { Thread.sleep(1000); duration += 1; if (
accum.value  0 || duration = 120) {println(### STOP SSC
###);ssc.stop(false, true); duration = 0; isRun = false} }}33
ssc.awaitTermination()34 println( Streaming context
terminated.)35 }36 37 streamingTest(null)38

Thanks
Terry


Re: Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-11 Thread Terry Hole
Hi, Akhil,

I tried this. It did not work. I also tried SparkConf.set(akka.
extensions,[\kamon.system.SystemMetrics\, \kamon.statsd.StatsD\]), it
also did not work.

Thanks

On Mon, May 11, 2015 at 2:56 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Try SparkConf.set(spark.akka.extensions,Whatever), underneath i think
 spark won't ship properties which don't start with spark.* to the executors.

 Thanks
 Best Regards

 On Mon, May 11, 2015 at 8:33 AM, Terry Hole hujie.ea...@gmail.com wrote:

 Hi all,

 I'd like to monitor the akka using kamon, which need to set the
 akka.extension to a list like this in typesafe config format:
   akka {
 extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]
   }

 But i can not find a way to do this, i have tried these:
 1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
 kamon.statsd.StatsD])
 2. use application.conf and set it use java option
 -Dconfig.resource=/path/to/conf
 3. Set akka.extensions [kamon.system.SystemMetrics,
 kamon.statsd.StatsD] in spark conf file

 None of these work.

 Do we have others ways to set this?

 Thanks!





Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-10 Thread Terry Hole
Hi all,

I'd like to monitor the akka using kamon, which need to set the
akka.extension to a list like this in typesafe config format:
  akka {
extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]
  }

But i can not find a way to do this, i have tried these:
1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
kamon.statsd.StatsD])
2. use application.conf and set it use java option
-Dconfig.resource=/path/to/conf
3. Set akka.extensions [kamon.system.SystemMetrics,
kamon.statsd.StatsD] in spark conf file

None of these work.

Do we have others ways to set this?

Thanks!


Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-10 Thread Terry Hole
Hi all,

I'd like to monitor the akka using kamon, which need to set the
akka.extension to a list like this in typesafe config format:

  akka {

extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]

  }

But i can not find a way to do this, i have tried these:

1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
kamon.statsd.StatsD])

2. use application.conf and set it use java option
-Dconfig.resource=/path/to/conf

3. Set akka.extensions [kamon.system.SystemMetrics,
kamon.statsd.StatsD] in spark conf file

None of these two works.

Do we have others ways to set this?

Thanks!


Is it possible to set the akka specify properties (akka.extensions) in spark

2015-05-07 Thread Terry Hole
Hi all,

I'd like to monitor the akka using kamon, which need to set the
akka.extension to a list like this in typesafe config format:
  akka {
extensions = [kamon.system.SystemMetrics, kamon.statsd.StatsD]
  }

But i can not find a way to do this, i have tried these:
1. SparkConf.set(akka.extensions, [kamon.system.SystemMetrics,
kamon.statsd.StatsD])
2. use application.conf and set it use java option
-Dconfig.resource=/path/to/conf
3. Set akka.extensions [kamon.system.SystemMetrics,
kamon.statsd.StatsD] in spark conf file

None of these two works.

Do we have others ways to set this?

Thanks!


Re: spark 1.3.0 strange log message

2015-04-23 Thread Terry Hole
Use this in spark conf: spark.ui.showConsoleProgress=false

Best Regards,

On Fri, Apr 24, 2015 at 11:23 AM, Henry Hung ythu...@winbond.com wrote:

  Dear All,



 When using spark 1.3.0 spark-submit with directing out and err to a log
 file, I saw some strange lines inside that looks like this:

 [Stage 0:(0 + 2)
 / 120]

 [Stage 0:(2 + 2)
 / 120]

 [Stage 0:==  (6 + 2)
 / 120]

 [Stage 0:=  (12 + 2)
 / 120]

 [Stage 0:=  (20 + 2)
 / 120]

 [Stage 0:===(24 + 2)
 / 120]

 [Stage 0:== (32 + 2)
 / 120]

 [Stage 0:===(42 + 2)
 / 120]

 [Stage 0:   (52 + 2)
 / 120]

 [Stage 0:===(59 + 2)
 / 120]

 [Stage 0:===(68 + 2)
 / 120]

 [Stage 0:   (78 + 3)
 / 120]

 [Stage 0:=  (88 + 4)
 / 120]

 [Stage 0:= (100 + 2)
 / 120]

 [Stage 0:==(110 + 2)
 / 120]





 Here is my log4j property:



 # Set everything to be logged to the console

 log4j.rootCategory=WARN, console

 log4j.appender.console=org.apache.log4j.ConsoleAppender

 log4j.appender.console.target=System.err

 log4j.appender.console.layout=org.apache.log4j.PatternLayout

 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
 %c{1}: %m%n



 # Settings to quiet third party logs that are too verbose

 log4j.logger.org.eclipse.jetty=WARN

 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR

 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO

 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO





 I want to know how to disable this kind of stage progress message?



 Best regards,

 Henry

 --
 The privileged confidential information contained in this email is
 intended for use only by the addressees as indicated by the original sender
 of this email. If you are not the addressee indicated in this email or are
 not responsible for delivery of the email to such a person, please kindly
 reply to the sender indicating this fact and delete all copies of it from
 your computer and network server immediately. Your cooperation is highly
 appreciated. It is advised that any unauthorized use of confidential
 information of Winbond is strictly prohibited; and any information in this
 email irrelevant to the official business of Winbond shall be deemed as
 neither given nor endorsed by Winbond.



Fwd: [Spark Streaming] The FileInputDStream newFilesOnly=false does not work in 1.2 since

2015-01-20 Thread Terry Hole
Hi,

I am trying to move from 1.1 to 1.2 and found that the newFilesOnly=false
(Intend to include old files) does not work anymore. It works great in 1.1,
this should be introduced by the last change of this class.



Does this flag behavior change or is it a regression?

Issue should be caused by this code:
From line 157 in FileInputDStream.scala
val modTimeIgnoreThreshold = math.max(
initialModTimeIgnoreThreshold,   // initial threshold based on
newFilesOnly setting
currentTime - durationToRemember.milliseconds  // trailing end of
the remember window
  )


Regards

- Terry


Re: Unable to use HiveContext in spark-shell

2014-11-06 Thread Terry Siu
What version of Spark are you using? Did you compile your Spark version
and if so, what compile options did you use?

On 11/6/14, 9:22 AM, tridib tridib.sama...@live.com wrote:

Help please!



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-use-HiveCont
ext-in-spark-shell-tp18261p18280.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unable to use HiveContext in spark-shell

2014-11-06 Thread Terry Siu
Those are the same options I used, except I had —tgz to package it and I built 
off of the master branch. Unfortunately, my only guess is that these errors 
stem from your build environment.  In your spark assembly, do you have any 
classes which belong to the org.apache.hadoop.hive package?


From: Tridib Samanta tridib.sama...@live.commailto:tridib.sama...@live.com
Date: Thursday, November 6, 2014 at 9:49 AM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com, 
u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org 
u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org
Subject: RE: Unable to use HiveContext in spark-shell

I am using spark 1.1.0.
I built it using:
./make-distribution.sh -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive 
-DskipTests

My ultimate goal is to execute a query on parquet file with nested structure 
and cast a date string to Date. This is required to calculate the age of Person 
entity.
but I am even unable to pass this line:
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
I made sure that org.apache.hadoop package is in the spark assembly jar.

Re-attaching the stack trace for quick reference.

scala val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

error: bad symbolic reference. A signature in HiveContext.class refers to term 
hive
in package org.apache.hadoop which is not available.
It may be completely missing from the current classpath, or the version on
the classpath might be incompatible with the version used when compiling 
HiveContext.class.
error:
 while compiling: console
during phase: erasure
 library version: version 2.10.4
compiler version: version 2.10.4
  reconstructed args:

  last tree to typer: Apply(value $outer)
  symbol: value $outer (flags: method synthetic stable 
expandedname triedcooking)
   symbol definition: val $outer(): $iwC.$iwC.type
 tpe: $iwC.$iwC.type
   symbol owners: value $outer - class $iwC - class $iwC - class $iwC - 
class $read - package $line5
  context owners: class $iwC - class $iwC - class $iwC - class $iwC - 
class $read - package $line5

== Enclosing template or block ==

ClassDef( // class $iwC extends Serializable
  0
  $iwC
  []
  Template( // val local $iwC: notype, tree.tpe=$iwC
java.lang.Object, scala.Serializable // parents
ValDef(
  private
  _
  tpt
  empty
)
// 5 statements
DefDef( // def init(arg$outer: $iwC.$iwC.$iwC.type): $iwC
  method triedcooking
  init
  []
  // 1 parameter list
  ValDef( // $outer: $iwC.$iwC.$iwC.type

$outer
tpt // tree.tpe=$iwC.$iwC.$iwC.type
empty
  )
  tpt // tree.tpe=$iwC
  Block( // tree.tpe=Unit
Apply( // def init(): Object in class Object, tree.tpe=Object
  $iwC.super.init // def init(): Object in class Object, 
tree.tpe=()Object
  Nil
)
()
  )
)
ValDef( // private[this] val sqlContext: 
org.apache.spark.sql.hive.HiveContext
  private local triedcooking
  sqlContext 
  tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext
  Apply( // def init(sc: org.apache.spark.SparkContext): 
org.apache.spark.sql.hive.HiveContext in class HiveContext, 
tree.tpe=org.apache.spark.sql.hive.HiveContext
new org.apache.spark.sql.hive.HiveContext.init // def init(sc: 
org.apache.spark.SparkContext): org.apache.spark.sql.hive.HiveContext in class 
HiveContext, tree.tpe=(sc: 
org.apache.spark.SparkContext)org.apache.spark.sql.hive.HiveContext
Apply( // val sc(): org.apache.spark.SparkContext, 
tree.tpe=org.apache.spark.SparkContext
  
$iwC.this.$line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$iwC$$$outer().$line5$$read$$iwC$$iwC$$$outer().$VAL1().$iw().$iw().sc
 // val sc(): org.apache.spark.SparkContext, 
tree.tpe=()org.apache.spark.SparkContext
  Nil
)
  )
)
DefDef( // val sqlContext(): org.apache.spark.sql.hive.HiveContext
  method stable accessor
  sqlContext
  []
  List(Nil)
  tpt // tree.tpe=org.apache.spark.sql.hive.HiveContext
  $iwC.this.sqlContext  // private[this] val sqlContext: 
org.apache.spark.sql.hive.HiveContext, 
tree.tpe=org.apache.spark.sql.hive.HiveContext
)
ValDef( // protected val $outer: $iwC.$iwC.$iwC.type
  protected synthetic paramaccessor triedcooking
  $outer 
  tpt // tree.tpe=$iwC.$iwC.$iwC.type
  empty
)
DefDef( // val $outer(): $iwC.$iwC.$iwC.type
  method synthetic stable expandedname triedcooking
  $line5$$read$$iwC$$iwC$$iwC$$iwC$$$outer
  []
  List(Nil)
  tpt // tree.tpe=Any
  $iwC.this.$outer  // protected val $outer: $iwC.$iwC.$iwC.type, 
tree.tpe=$iwC.$iwC.$iwC.type
)
  )
)

== Expanded type of tree ==

ThisType(class $iwC)

uncaught exception during compilation: scala.reflect.internal.Types$TypeError
scala.reflect.internal.Types

NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile

2014-11-03 Thread Terry Siu
)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.ClassNotFoundException: 
com.esotericsoftware.shaded.org.objenesategy.InstantiatorStrategy

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

... 63 more


This missing class does exist, but it resides in the package 
org.objenesis.strategy in the assembly jar. I was hoping to see if the 
-Phive-0.13.1 profile would result in better compatibility as we are using 
CDH5.2 with Hive 0.13.

I generated another build with only -Phive this time and when I issued the same 
command above, it completed without any errors.

I’m wondering what sort of benefit there is to include the -Phive-0.13.1 
profile into the build as it looks like there’s some shaded jar action going on.

Thanks,
-Terry




ParquetFilters and StringType support for GT, GTE, LT, LTE

2014-11-03 Thread Terry Siu
Is there any reason why StringType is not a supported type the GT, GTE, LT, LTE 
operations? I was able to previously have a predicate where my column type was 
a string and execute a filter with one of the above operators in SparkSQL w/o 
any problems. However, I synced up to the latest code this morning and now the 
same query will give me a MatchError for this column of string type.

Thanks,
-Terry




Re: NoClassDefFoundError encountered in Spark 1.2-snapshot build with hive-0.13.1 profile

2014-11-03 Thread Terry Siu
Thanks, Kousuke. I’ll wait till this pull request makes it into the master 
branch.

-Terry

From: Kousuke Saruta 
saru...@oss.nttdata.co.jpmailto:saru...@oss.nttdata.co.jp
Date: Monday, November 3, 2014 at 11:11 AM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: NoClassDefFoundError encountered in Spark 1.2-snapshot build with 
hive-0.13.1 profile

Hi Terry

I think the issue you mentioned will be resolved by following PR.
https://github.com/apache/spark/pull/3072

- Kousuke

(2014/11/03 10:42), Terry Siu wrote:
I just built the 1.2 snapshot current as of commit 76386e1a23c using:

$ ./make-distribution.sh —tgz —name my-spark —skip-java-test -DskipTests 
-Phadoop-2.4 -Phive -Phive-0.13.1 -Pyarn

I drop in my Hive configuration files into the conf directory, launch 
spark-shell, and then create my HiveContext, hc. I then issue a “use db” 
command:

scala hc.hql(“use db”)

and receive the following class-not-found error:


java.lang.NoClassDefFoundError: 
com/esotericsoftware/shaded/org/objenesis/strategy/InstantiatorStrategy

at org.apache.hadoop.hive.ql.exec.Utilities.clinit(Utilities.java:925)

at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1224)

at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088)

at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911)

at org.apache.hadoop.hive.ql.Driver.run(Driver.java:901)

at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:315)

at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:286)

at 
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult$lzycompute(NativeCommand.scala:35)

at 
org.apache.spark.sql.hive.execution.NativeCommand.sideEffectResult(NativeCommand.scala:35)

at 
org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)

at 
org.apache.spark.sql.hive.execution.NativeCommand.execute(NativeCommand.scala:30)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:424)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:424)

at 
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)

at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103)

at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:111)

at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:115)

at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44)

at $iwC$$iwC$$iwC$$iwC.init(console:46)

at $iwC$$iwC$$iwC.init(console:48)

at $iwC$$iwC.init(console:50)

at $iwC.init(console:52)

at init(console:54)

at .init(console:58)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorIva:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:8

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILola:968)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scal

at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoadla:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916

Re: ParquetFilters and StringType support for GT, GTE, LT, LTE

2014-11-03 Thread Terry Siu
Done.

https://issues.apache.org/jira/browse/SPARK-4213

Thanks,
-Terry

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Monday, November 3, 2014 at 1:37 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: ParquetFilters and StringType support for GT, GTE, LT, LTE

That sounds like a regression.  Could you open a JIRA with steps to reproduce 
(https://issues.apache.org/jira/browse/SPARK)?  We'll want to fix this before 
the 1.2 release.

On Mon, Nov 3, 2014 at 11:04 AM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
Is there any reason why StringType is not a supported type the GT, GTE, LT, LTE 
operations? I was able to previously have a predicate where my column type was 
a string and execute a filter with one of the above operators in SparkSQL w/o 
any problems. However, I synced up to the latest code this morning and now the 
same query will give me a MatchError for this column of string type.

Thanks,
-Terry





Spark Build

2014-10-31 Thread Terry Siu
I am synced up to the Spark master branch as of commit 23468e7e96. I have Maven 
3.0.5, Scala 2.10.3, and SBT 0.13.1. I’ve built the master branch successfully 
previously and am trying to rebuild again to take advantage of the new Hive 
0.13.1 profile. I execute the following command:

$ mvn -DskipTests -Phive-0.13-1 -Phadoop-2.4 -Pyarn clean package

The build fails at the following stage:


INFO] Using incremental compilation

[INFO] compiler plugin: 
BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null)

[INFO] Compiling 5 Scala sources to 
/home/terrys/Applications/spark/yarn/stable/target/scala-2.10/test-classes...

[ERROR] 
/home/terrys/Applications/spark/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:20:
 object MemLimitLogger is not a member of package org.apache.spark.deploy.yarn

[ERROR] import org.apache.spark.deploy.yarn.MemLimitLogger._

[ERROR] ^

[ERROR] 
/home/terrys/Applications/spark/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:29:
 not found: value memLimitExceededLogMessage

[ERROR] val vmemMsg = memLimitExceededLogMessage(diagnostics, 
VMEM_EXCEEDED_PATTERN)

[ERROR]   ^

[ERROR] 
/home/terrys/Applications/spark/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala:30:
 not found: value memLimitExceededLogMessage

[ERROR] val pmemMsg = memLimitExceededLogMessage(diagnostics, 
PMEM_EXCEEDED_PATTERN)

[ERROR]   ^

[ERROR] three errors found

[INFO] 

[INFO] Reactor Summary:

[INFO]

[INFO] Spark Project Parent POM .. SUCCESS [2.758s]

[INFO] Spark Project Common Network Code . SUCCESS [6.716s]

[INFO] Spark Project Core  SUCCESS [2:46.610s]

[INFO] Spark Project Bagel ... SUCCESS [16.776s]

[INFO] Spark Project GraphX .. SUCCESS [52.159s]

[INFO] Spark Project Streaming ... SUCCESS [1:09.883s]

[INFO] Spark Project ML Library .. SUCCESS [1:18.932s]

[INFO] Spark Project Tools ... SUCCESS [10.210s]

[INFO] Spark Project Catalyst  SUCCESS [1:12.499s]

[INFO] Spark Project SQL . SUCCESS [1:10.561s]

[INFO] Spark Project Hive  SUCCESS [1:08.571s]

[INFO] Spark Project REPL  SUCCESS [32.377s]

[INFO] Spark Project YARN Parent POM . SUCCESS [1.317s]

[INFO] Spark Project YARN Stable API . FAILURE [25.918s]

[INFO] Spark Project Assembly  SKIPPED

[INFO] Spark Project External Twitter  SKIPPED

[INFO] Spark Project External Kafka .. SKIPPED

[INFO] Spark Project External Flume Sink . SKIPPED

[INFO] Spark Project External Flume .. SKIPPED

[INFO] Spark Project External ZeroMQ . SKIPPED

[INFO] Spark Project External MQTT ... SKIPPED

[INFO] Spark Project Examples  SKIPPED

[INFO] 

[INFO] BUILD FAILURE

[INFO] 

[INFO] Total time: 11:15.889s

[INFO] Finished at: Fri Oct 31 12:08:55 PDT 2014

[INFO] Final Memory: 73M/829M

[INFO] 

[ERROR] Failed to execute goal 
net.alchim31.maven:scala-maven-plugin:3.2.0:testCompile 
(scala-test-compile-first) on project spark-yarn_2.10: Execution 
scala-test-compile-first of goal 
net.alchim31.maven:scala-maven-plugin:3.2.0:testCompile failed. CompileFailed 
- [Help 1]

[ERROR]

[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR]

[ERROR] For more information about the errors and possible solutions, please 
read the following articles:

[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException

[ERROR]

[ERROR] After correcting the problems, you can resume the build with the command

[ERROR]   mvn goals -rf :spark-yarn_2.10


I could not find MemLimitLogger anywhere in the Spark code. Anybody else 
seen/encounter this?


Thanks,

-Terry






Re: Spark Build

2014-10-31 Thread Terry Siu
Thanks for the update, Shivaram.

-Terry

On 10/31/14, 12:37 PM, Shivaram Venkataraman
shiva...@eecs.berkeley.edu wrote:

Yeah looks like https://github.com/apache/spark/pull/2744 broke the
build. We will fix it soon

On Fri, Oct 31, 2014 at 12:21 PM, Terry Siu terry@smartfocus.com
wrote:
 I am synced up to the Spark master branch as of commit 23468e7e96. I
have
 Maven 3.0.5, Scala 2.10.3, and SBT 0.13.1. I¹ve built the master branch
 successfully previously and am trying to rebuild again to take
advantage of
 the new Hive 0.13.1 profile. I execute the following command:

 $ mvn -DskipTests -Phive-0.13-1 -Phadoop-2.4 -Pyarn clean package

 The build fails at the following stage:

 INFO] Using incremental compilation

 [INFO] compiler plugin:
 BasicArtifact(org.scalamacros,paradise_2.10.4,2.0.1,null)

 [INFO] Compiling 5 Scala sources to
 
/home/terrys/Applications/spark/yarn/stable/target/scala-2.10/test-classe
s...

 [ERROR]
 
/home/terrys/Applications/spark/yarn/common/src/test/scala/org/apache/spa
rk/deploy/yarn/YarnAllocatorSuite.scala:20:
 object MemLimitLogger is not a member of package
 org.apache.spark.deploy.yarn

 [ERROR] import org.apache.spark.deploy.yarn.MemLimitLogger._

 [ERROR] ^

 [ERROR]
 
/home/terrys/Applications/spark/yarn/common/src/test/scala/org/apache/spa
rk/deploy/yarn/YarnAllocatorSuite.scala:29:
 not found: value memLimitExceededLogMessage

 [ERROR] val vmemMsg = memLimitExceededLogMessage(diagnostics,
 VMEM_EXCEEDED_PATTERN)

 [ERROR]   ^

 [ERROR]
 
/home/terrys/Applications/spark/yarn/common/src/test/scala/org/apache/spa
rk/deploy/yarn/YarnAllocatorSuite.scala:30:
 not found: value memLimitExceededLogMessage

 [ERROR] val pmemMsg = memLimitExceededLogMessage(diagnostics,
 PMEM_EXCEEDED_PATTERN)

 [ERROR]   ^

 [ERROR] three errors found

 [INFO]
 

 [INFO] Reactor Summary:

 [INFO]

 [INFO] Spark Project Parent POM .. SUCCESS
[2.758s]

 [INFO] Spark Project Common Network Code . SUCCESS
[6.716s]

 [INFO] Spark Project Core  SUCCESS
 [2:46.610s]

 [INFO] Spark Project Bagel ... SUCCESS
[16.776s]

 [INFO] Spark Project GraphX .. SUCCESS
[52.159s]

 [INFO] Spark Project Streaming ... SUCCESS
 [1:09.883s]

 [INFO] Spark Project ML Library .. SUCCESS
 [1:18.932s]

 [INFO] Spark Project Tools ... SUCCESS
[10.210s]

 [INFO] Spark Project Catalyst  SUCCESS
 [1:12.499s]

 [INFO] Spark Project SQL . SUCCESS
 [1:10.561s]

 [INFO] Spark Project Hive  SUCCESS
 [1:08.571s]

 [INFO] Spark Project REPL  SUCCESS
[32.377s]

 [INFO] Spark Project YARN Parent POM . SUCCESS
[1.317s]

 [INFO] Spark Project YARN Stable API . FAILURE
[25.918s]

 [INFO] Spark Project Assembly  SKIPPED

 [INFO] Spark Project External Twitter  SKIPPED

 [INFO] Spark Project External Kafka .. SKIPPED

 [INFO] Spark Project External Flume Sink . SKIPPED

 [INFO] Spark Project External Flume .. SKIPPED

 [INFO] Spark Project External ZeroMQ . SKIPPED

 [INFO] Spark Project External MQTT ... SKIPPED

 [INFO] Spark Project Examples  SKIPPED

 [INFO]
 

 [INFO] BUILD FAILURE

 [INFO]
 

 [INFO] Total time: 11:15.889s

 [INFO] Finished at: Fri Oct 31 12:08:55 PDT 2014

 [INFO] Final Memory: 73M/829M

 [INFO]
 

 [ERROR] Failed to execute goal
 net.alchim31.maven:scala-maven-plugin:3.2.0:testCompile
 (scala-test-compile-first) on project spark-yarn_2.10: Execution
 scala-test-compile-first of goal
 net.alchim31.maven:scala-maven-plugin:3.2.0:testCompile failed.
 CompileFailed - [Help 1]

 [ERROR]

 [ERROR] To see the full stack trace of the errors, re-run Maven with
the -e
 switch.

 [ERROR] Re-run Maven using the -X switch to enable full debug logging.

 [ERROR]

 [ERROR] For more information about the errors and possible solutions,
please
 read the following articles:

 [ERROR] [Help 1]
 
http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException

 [ERROR]

 [ERROR] After correcting the problems, you can resume the build with the
 command

 [ERROR]   mvn goals -rf :spark-yarn_2.10


 I could not find MemLimitLogger anywhere in the Spark code. Anybody else
 seen/encounter this?


 Thanks,

 -Terry

Re: Ambiguous references to id : what does it mean ?

2014-10-30 Thread Terry Siu
Found this as I am having the same issue. I have exactly the same usage as
shown in Michael's join example. I tried executing a SQL statement against
the join data set with two columns that have the same name and tried to
unambiguate the column name with the table alias, but I would still get an
Unresolved attributes error back. Is there any way around this short of
renaming the columns in the join sources?

Thanks
-Terry


Michael Armbrust wrote
Yes, but if both tagCollection and selectedVideos have a column named id
then Spark SQL does not know which one you are referring to in the where
clause.  Here's an example with aliases:
 val x = testData2.as('x)
 val y = testData2.as('y)
 val join = x.join(y, Inner, Some(x.a.attr === y.a.attr))
On Wed, Jul 16, 2014 at 2:47 AM, Jaonary Rabarisoa lt;

jaonary@

gt;
wrote:
My query is just a simple query that use the spark sql dsl :

tagCollection.join(selectedVideos).where('videoId === 'id)




On Tue, Jul 15, 2014 at 6:03 PM, Yin Huai lt;

huaiyin.thu@

gt; wrote:

Hi Jao,

Seems the SQL analyzer cannot resolve the references in the Join
condition. What is your query? Did you use the Hive Parser (your query
was
submitted through hql(...)) or the basic SQL Parser (your query was
submitted through sql(...)).

Thanks,

Yin


On Tue, Jul 15, 2014 at 8:52 AM, Jaonary Rabarisoa lt;

jaonary@

gt;
wrote:

Hi all,

When running a join operation with Spark SQL I got the following error
:


Exception in thread main
org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Ambiguous
references to id: (id#303,List()),(id#0,List()), tree:
Filter ('videoId = 'id)
  Join Inner, None
   ParquetRelation data/tags.parquet
   Filter (name#1 = P1/cam1)
ParquetRelation data/videos.parquet


What does it mean ?


Cheers,


jao








Re: SparkSQL - TreeNodeException for unresolved attributes

2014-10-21 Thread Terry Siu
Just to follow up, the queries worked against master and I got my whole flow 
rolling. Thanks for the suggestion! Now if only Spark 1.2 will come out with 
the next release of CDH5  :P

-Terry

From: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Date: Monday, October 20, 2014 at 12:22 PM
To: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL - TreeNodeException for unresolved attributes

Hi Michael,

Thanks again for the reply. Was hoping it was something I was doing wrong in 
1.1.0, but I’ll try master.

Thanks,
-Terry

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Monday, October 20, 2014 at 12:11 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL - TreeNodeException for unresolved attributes

Have you tried this on master?  There were several problems with resolution of 
complex queries that were registered as tables in the 1.1.0 release.

On Mon, Oct 20, 2014 at 10:33 AM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
Hi all,

I’m getting a TreeNodeException for unresolved attributes when I do a simple 
select from a schemaRDD generated by a join in Spark 1.1.0. A little background 
first. I am using a HiveContext (against Hive 0.12) to grab two tables, join 
them, and then perform multiple INSERT-SELECT with GROUP BY to write back out 
to a Hive rollup table that has two partitions. This task is an effort to 
simulate the unsupported GROUPING SETS functionality in SparkSQL.

In my first attempt, I got really close using  SchemaRDD.groupBy until I 
realized that SchemaRDD.insertTo API does not support partitioned tables yet. 
This prompted my second attempt to pass in SQL to the HiveContext.sql API 
instead.

Here’s a rundown of the commands I executed on the spark-shell:


val hc = new HiveContext(sc)

hc.setConf(spark.sql.hive.convertMetastoreParquet, true”)

hc.setConf(spark.sql.parquet.compression.codec, snappy”)


// For implicit conversions to Expression

val sqlContext = new SQLContext(sc)

import sqlContext._


val segCusts = hc.hql(“select …”)

val segTxns = hc.hql(“select …”)


val sc = segCusts.as('sc)

val st = segTxns.as(‘st)


// Join the segCusts and segTxns tables

val rup = sc.join(st, Inner, 
Some(sc.segcustomerid.attr===st.customerid.attr))

rup.registerAsTable(“rupbrand”)



If I do a printSchema on the rup, I get:

root

 |-- segcustomerid: string (nullable = true)

 |-- sales: double (nullable = false)

 |-- tx_count: long (nullable = false)

 |-- storeid: string (nullable = true)

 |-- transdate: long (nullable = true)

 |-- transdate_ts: string (nullable = true)

 |-- transdate_dt: string (nullable = true)

 |-- unitprice: double (nullable = true)

 |-- translineitem: string (nullable = true)

 |-- offerid: string (nullable = true)

 |-- customerid: string (nullable = true)

 |-- customerkey: string (nullable = true)

 |-- sku: string (nullable = true)

 |-- quantity: double (nullable = true)

 |-- returnquantity: double (nullable = true)

 |-- channel: string (nullable = true)

 |-- unitcost: double (nullable = true)

 |-- transid: string (nullable = true)

 |-- productid: string (nullable = true)

 |-- id: string (nullable = true)

 |-- campaign_campaigncost: double (nullable = true)

 |-- campaign_begindate: long (nullable = true)

 |-- campaign_begindate_ts: string (nullable = true)

 |-- campaign_begindate_dt: string (nullable = true)

 |-- campaign_enddate: long (nullable = true)

 |-- campaign_enddate_ts: string (nullable = true)

 |-- campaign_enddate_dt: string (nullable = true)

 |-- campaign_campaigntitle: string (nullable = true)

 |-- campaign_campaignname: string (nullable = true)

 |-- campaign_id: string (nullable = true)

 |-- product_categoryid: string (nullable = true)

 |-- product_company: string (nullable = true)

 |-- product_brandname: string (nullable = true)

 |-- product_vendorid: string (nullable = true)

 |-- product_color: string (nullable = true)

 |-- product_brandid: string (nullable = true)

 |-- product_description: string (nullable = true)

 |-- product_size: string (nullable = true)

 |-- product_subcategoryid: string (nullable = true)

 |-- product_departmentid: string (nullable = true)

 |-- product_productname: string (nullable = true)

 |-- product_categoryname: string (nullable = true)

 |-- product_vendorname: string (nullable = true)

 |-- product_sku: string (nullable = true)

 |-- product_subcategoryname: string (nullable = true)

 |-- product_status: string (nullable = true)

 |-- product_departmentname: string (nullable = true)

 |-- product_style: string (nullable = true)

 |-- product_id: string (nullable = true)

 |-- customer_lastname: string (nullable

Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

2014-10-20 Thread Terry Siu
Hi Yin,

Sorry for the delay, but I’ll try the code change when I get a chance, but 
Michael’s initial response did solve my problem. In the meantime, I’m hitting 
another issue with SparkSQL which I will probably post another message if I 
can’t figure a workaround.

Thanks,
-Terry

From: Yin Huai huaiyin@gmail.commailto:huaiyin@gmail.com
Date: Thursday, October 16, 2014 at 7:08 AM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

Hello Terry,

I guess you hit this bughttps://issues.apache.org/jira/browse/SPARK-3559. The 
list of needed column ids was messed up. Can you try the master branch or apply 
the code 
changehttps://github.com/apache/spark/commit/e10d71e7e58bf2ec0f1942cb2f0602396ab866b4
 to your 1.1 and see if the problem is resolved?/

Thanks,

Yin

On Wed, Oct 15, 2014 at 12:08 PM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
Hi Yin,

pqt_rdt_snappy has 76 columns. These two parquet tables were created via Hive 
0.12 from existing Avro data using CREATE TABLE following by an INSERT 
OVERWRITE. These are partitioned tables - pqt_rdt_snappy has one partition 
while pqt_segcust_snappy has two partitions. For pqt_segcust_snappy, I noticed 
that when I populated it with a single INSERT OVERWRITE over all the partitions 
and then executed the Spark code, it would report an illegal index value of 29. 
 However, if I manually did INSERT OVERWRITE for every single partition, I 
would get an illegal index value of 21. I don’t know if this will help in 
debugging, but here’s the DESCRIBE output for pqt_segcust_snappy:


OK

col_namedata_type   comment

customer_id string  from deserializer

age_range   string  from deserializer

gender  string  from deserializer

last_tx_datebigint  from deserializer

last_tx_date_ts string  from deserializer

last_tx_date_dt string  from deserializer

first_tx_date   bigint  from deserializer

first_tx_date_tsstring  from deserializer

first_tx_date_dtstring  from deserializer

second_tx_date  bigint  from deserializer

second_tx_date_ts   string  from deserializer

second_tx_date_dt   string  from deserializer

third_tx_date   bigint  from deserializer

third_tx_date_tsstring  from deserializer

third_tx_date_dtstring  from deserializer

frequency   double  from deserializer

tx_size double  from deserializer

recency double  from deserializer

rfm double  from deserializer

tx_countbigint  from deserializer

sales   double  from deserializer

coll_def_id string  None

seg_def_id  string  None



# Partition Information

# col_name  data_type   comment



coll_def_id string  None

seg_def_id  string  None

Time taken: 0.788 seconds, Fetched: 29 row(s)


As you can see, I have 21 data columns, followed by the 2 partition columns, 
coll_def_id and seg_def_id. Output shows 29 rows, but that looks like it’s just 
counting the rows in the console output. Let me know if you need more 
information.


Thanks

-Terry


From: Yin Huai huaiyin@gmail.commailto:huaiyin@gmail.com
Date: Tuesday, October 14, 2014 at 6:29 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org

Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

Hello Terry,

How many columns does pqt_rdt_snappy have?

Thanks,

Yin

On Tue, Oct 14, 2014 at 11:52 AM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
Hi Michael,

That worked for me. At least I’m now further than I was. Thanks for the tip!

-Terry

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Monday, October 13, 2014 at 5:05 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

SparkSQL - TreeNodeException for unresolved attributes

2014-10-20 Thread Terry Siu
:


hc.sql(“select transid from rupbrand”)


and I get:


scala hc.sql(select transid from rupbrand)

14/10/20 10:01:44 INFO ParseDriver: Parsing command: select transid from 
rupbrand

14/10/20 10:01:44 INFO ParseDriver: Parse Completed

res18: org.apache.spark.sql.SchemaRDD =

SchemaRDD[121] at RDD at SchemaRDD.scala:103

== Query Plan ==

== Physical Plan ==

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: 'transid, tree:

Project ['transid]

 LowerCaseSchema

  Subquery rupbrand

   Join Inner, Some(('sc.segcustomerid = 'st.customerid))

Subquery sc

 Filter CAST((COUNT(DISTINCT 't.transid)  0), BooleanType)

  Aggregate ['c.customerid], ['c.customerId AS 
segcustomerid#5,SUM('t.sales) AS sales#6,COUNT(DISTINCT 't.transid) AS 
tx_count#7]

   Filter 'c.gender IN (Male)

Join Inner, Some(('c.customerid = 't.customerid))

 Subquery t

  Aggregate [customerid#3259,transid#3266], ['d.customerId AS 
customerid#1,transid#3266 AS transid#2,SUM((quantity#3262 * …


I’m wondering if the query for the column in my join table is somehow 
conflicting with the columns from the two tables on which the join table is 
constructed from as I see the plan, a breakdown of various pieces from the 
queries on my two source tables.


Help?


Thanks,

-Terry




Re: SparkSQL - TreeNodeException for unresolved attributes

2014-10-20 Thread Terry Siu
Hi Michael,

Thanks again for the reply. Was hoping it was something I was doing wrong in 
1.1.0, but I’ll try master.

Thanks,
-Terry

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Monday, October 20, 2014 at 12:11 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL - TreeNodeException for unresolved attributes

Have you tried this on master?  There were several problems with resolution of 
complex queries that were registered as tables in the 1.1.0 release.

On Mon, Oct 20, 2014 at 10:33 AM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
Hi all,

I’m getting a TreeNodeException for unresolved attributes when I do a simple 
select from a schemaRDD generated by a join in Spark 1.1.0. A little background 
first. I am using a HiveContext (against Hive 0.12) to grab two tables, join 
them, and then perform multiple INSERT-SELECT with GROUP BY to write back out 
to a Hive rollup table that has two partitions. This task is an effort to 
simulate the unsupported GROUPING SETS functionality in SparkSQL.

In my first attempt, I got really close using  SchemaRDD.groupBy until I 
realized that SchemaRDD.insertTo API does not support partitioned tables yet. 
This prompted my second attempt to pass in SQL to the HiveContext.sql API 
instead.

Here’s a rundown of the commands I executed on the spark-shell:


val hc = new HiveContext(sc)

hc.setConf(spark.sql.hive.convertMetastoreParquet, true”)

hc.setConf(spark.sql.parquet.compression.codec, snappy”)


// For implicit conversions to Expression

val sqlContext = new SQLContext(sc)

import sqlContext._


val segCusts = hc.hql(“select …”)

val segTxns = hc.hql(“select …”)


val sc = segCusts.as('sc)

val st = segTxns.as(‘st)


// Join the segCusts and segTxns tables

val rup = sc.join(st, Inner, 
Some(sc.segcustomerid.attr===st.customerid.attr))

rup.registerAsTable(“rupbrand”)



If I do a printSchema on the rup, I get:

root

 |-- segcustomerid: string (nullable = true)

 |-- sales: double (nullable = false)

 |-- tx_count: long (nullable = false)

 |-- storeid: string (nullable = true)

 |-- transdate: long (nullable = true)

 |-- transdate_ts: string (nullable = true)

 |-- transdate_dt: string (nullable = true)

 |-- unitprice: double (nullable = true)

 |-- translineitem: string (nullable = true)

 |-- offerid: string (nullable = true)

 |-- customerid: string (nullable = true)

 |-- customerkey: string (nullable = true)

 |-- sku: string (nullable = true)

 |-- quantity: double (nullable = true)

 |-- returnquantity: double (nullable = true)

 |-- channel: string (nullable = true)

 |-- unitcost: double (nullable = true)

 |-- transid: string (nullable = true)

 |-- productid: string (nullable = true)

 |-- id: string (nullable = true)

 |-- campaign_campaigncost: double (nullable = true)

 |-- campaign_begindate: long (nullable = true)

 |-- campaign_begindate_ts: string (nullable = true)

 |-- campaign_begindate_dt: string (nullable = true)

 |-- campaign_enddate: long (nullable = true)

 |-- campaign_enddate_ts: string (nullable = true)

 |-- campaign_enddate_dt: string (nullable = true)

 |-- campaign_campaigntitle: string (nullable = true)

 |-- campaign_campaignname: string (nullable = true)

 |-- campaign_id: string (nullable = true)

 |-- product_categoryid: string (nullable = true)

 |-- product_company: string (nullable = true)

 |-- product_brandname: string (nullable = true)

 |-- product_vendorid: string (nullable = true)

 |-- product_color: string (nullable = true)

 |-- product_brandid: string (nullable = true)

 |-- product_description: string (nullable = true)

 |-- product_size: string (nullable = true)

 |-- product_subcategoryid: string (nullable = true)

 |-- product_departmentid: string (nullable = true)

 |-- product_productname: string (nullable = true)

 |-- product_categoryname: string (nullable = true)

 |-- product_vendorname: string (nullable = true)

 |-- product_sku: string (nullable = true)

 |-- product_subcategoryname: string (nullable = true)

 |-- product_status: string (nullable = true)

 |-- product_departmentname: string (nullable = true)

 |-- product_style: string (nullable = true)

 |-- product_id: string (nullable = true)

 |-- customer_lastname: string (nullable = true)

 |-- customer_familystatus: string (nullable = true)

 |-- customer_customertype: string (nullable = true)

 |-- customer_city: string (nullable = true)

 |-- customer_country: string (nullable = true)

 |-- customer_state: string (nullable = true)

 |-- customer_region: string (nullable = true)

 |-- customer_customergroup: string (nullable = true)

 |-- customer_maritalstatus: string (nullable = true)

 |-- customer_agerange: string (nullable = true)

 |-- customer_zip: string (nullable = true)

 |-- customer_age: double (nullable = true

Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

2014-10-15 Thread Terry Siu
Hi Yin,

pqt_rdt_snappy has 76 columns. These two parquet tables were created via Hive 
0.12 from existing Avro data using CREATE TABLE following by an INSERT 
OVERWRITE. These are partitioned tables - pqt_rdt_snappy has one partition 
while pqt_segcust_snappy has two partitions. For pqt_segcust_snappy, I noticed 
that when I populated it with a single INSERT OVERWRITE over all the partitions 
and then executed the Spark code, it would report an illegal index value of 29. 
 However, if I manually did INSERT OVERWRITE for every single partition, I 
would get an illegal index value of 21. I don’t know if this will help in 
debugging, but here’s the DESCRIBE output for pqt_segcust_snappy:


OK

col_namedata_type   comment

customer_id string  from deserializer

age_range   string  from deserializer

gender  string  from deserializer

last_tx_datebigint  from deserializer

last_tx_date_ts string  from deserializer

last_tx_date_dt string  from deserializer

first_tx_date   bigint  from deserializer

first_tx_date_tsstring  from deserializer

first_tx_date_dtstring  from deserializer

second_tx_date  bigint  from deserializer

second_tx_date_ts   string  from deserializer

second_tx_date_dt   string  from deserializer

third_tx_date   bigint  from deserializer

third_tx_date_tsstring  from deserializer

third_tx_date_dtstring  from deserializer

frequency   double  from deserializer

tx_size double  from deserializer

recency double  from deserializer

rfm double  from deserializer

tx_countbigint  from deserializer

sales   double  from deserializer

coll_def_id string  None

seg_def_id  string  None



# Partition Information

# col_name  data_type   comment



coll_def_id string  None

seg_def_id  string  None

Time taken: 0.788 seconds, Fetched: 29 row(s)


As you can see, I have 21 data columns, followed by the 2 partition columns, 
coll_def_id and seg_def_id. Output shows 29 rows, but that looks like it’s just 
counting the rows in the console output. Let me know if you need more 
information.


Thanks

-Terry


From: Yin Huai huaiyin@gmail.commailto:huaiyin@gmail.com
Date: Tuesday, October 14, 2014 at 6:29 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

Hello Terry,

How many columns does pqt_rdt_snappy have?

Thanks,

Yin

On Tue, Oct 14, 2014 at 11:52 AM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
Hi Michael,

That worked for me. At least I’m now further than I was. Thanks for the tip!

-Terry

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Monday, October 13, 2014 at 5:05 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

There are some known bug with the parquet serde and spark 1.1.

You can try setting spark.sql.hive.convertMetastoreParquet=true to cause spark 
sql to use built in parquet support when the serde looks like parquet.

On Mon, Oct 13, 2014 at 2:57 PM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
I am currently using Spark 1.1.0 that has been compiled against Hadoop 2.3. Our 
cluster is CDH5.1.2 which is runs Hive 0.12. I have two external Hive tables 
that point to Parquet (compressed with Snappy), which were converted over from 
Avro if that matters.

I am trying to perform a join with these two Hive tables, but am encountering 
an exception. In a nutshell, I launch a spark shell, create my HiveContext 
(pointing to the correct metastore on our cluster), and then proceed to do the 
following:

scala val hc = new HiveContext(sc)

scala val txn = hc.sql(“select * from pqt_rdt_snappy where transdate = 
132537600 and translate = 134006399”)

scala val segcust = hc.sql(“select * from pqt_segcust_snappy where 
coll_def_id=‘abcd’”)

scala txn.registerAsTable(“segTxns”)

scala segcust.registerAsTable(“segCusts

Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

2014-10-14 Thread Terry Siu
Hi Michael,

That worked for me. At least I’m now further than I was. Thanks for the tip!

-Terry

From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com
Date: Monday, October 13, 2014 at 5:05 PM
To: Terry Siu terry@smartfocus.commailto:terry@smartfocus.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: SparkSQL IndexOutOfBoundsException when reading from Parquet

There are some known bug with the parquet serde and spark 1.1.

You can try setting spark.sql.hive.convertMetastoreParquet=true to cause spark 
sql to use built in parquet support when the serde looks like parquet.

On Mon, Oct 13, 2014 at 2:57 PM, Terry Siu 
terry@smartfocus.commailto:terry@smartfocus.com wrote:
I am currently using Spark 1.1.0 that has been compiled against Hadoop 2.3. Our 
cluster is CDH5.1.2 which is runs Hive 0.12. I have two external Hive tables 
that point to Parquet (compressed with Snappy), which were converted over from 
Avro if that matters.

I am trying to perform a join with these two Hive tables, but am encountering 
an exception. In a nutshell, I launch a spark shell, create my HiveContext 
(pointing to the correct metastore on our cluster), and then proceed to do the 
following:

scala val hc = new HiveContext(sc)

scala val txn = hc.sql(“select * from pqt_rdt_snappy where transdate = 
132537600 and translate = 134006399”)

scala val segcust = hc.sql(“select * from pqt_segcust_snappy where 
coll_def_id=‘abcd’”)

scala txn.registerAsTable(“segTxns”)

scala segcust.registerAsTable(“segCusts”)

scala val joined = hc.sql(“select t.transid, c.customer_id from segTxns t join 
segCusts c on t.customerid=c.customer_id”)

Straight forward enough, but I get the following exception:


14/10/13 14:37:12 ERROR Executor: Exception in task 1.0 in stage 18.0 (TID 51)

java.lang.IndexOutOfBoundsException: Index: 21, Size: 21

at java.util.ArrayList.rangeCheck(ArrayList.java:635)

at java.util.ArrayList.get(ArrayList.java:411)

at 
org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport.init(DataWritableReadSupport.java:94)

at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.getSplit(ParquetRecordReaderWrapper.java:206)

at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.init(ParquetRecordReaderWrapper.java:81)

at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.init(ParquetRecordReaderWrapper.java:67)

at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:51)

at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:197)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:188)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:97)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)


The number of columns in my table, pqt_segcust_snappy, has 21 columns and two 
partitions defined. Does this error look familiar to anyone? Could my usage of 
SparkSQL with Hive be incorrect or is support with Hive/Parquet/partitioning 
still buggy at this point in Spark 1.1.0?


Thanks,

-Terry





SparkSQL IndexOutOfBoundsException when reading from Parquet

2014-10-13 Thread Terry Siu
I am currently using Spark 1.1.0 that has been compiled against Hadoop 2.3. Our 
cluster is CDH5.1.2 which is runs Hive 0.12. I have two external Hive tables 
that point to Parquet (compressed with Snappy), which were converted over from 
Avro if that matters.

I am trying to perform a join with these two Hive tables, but am encountering 
an exception. In a nutshell, I launch a spark shell, create my HiveContext 
(pointing to the correct metastore on our cluster), and then proceed to do the 
following:

scala val hc = new HiveContext(sc)

scala val txn = hc.sql(“select * from pqt_rdt_snappy where transdate = 
132537600 and translate = 134006399”)

scala val segcust = hc.sql(“select * from pqt_segcust_snappy where 
coll_def_id=‘abcd’”)

scala txn.registerAsTable(“segTxns”)

scala segcust.registerAsTable(“segCusts”)

scala val joined = hc.sql(“select t.transid, c.customer_id from segTxns t join 
segCusts c on t.customerid=c.customer_id”)

Straight forward enough, but I get the following exception:


14/10/13 14:37:12 ERROR Executor: Exception in task 1.0 in stage 18.0 (TID 51)

java.lang.IndexOutOfBoundsException: Index: 21, Size: 21

at java.util.ArrayList.rangeCheck(ArrayList.java:635)

at java.util.ArrayList.get(ArrayList.java:411)

at 
org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport.init(DataWritableReadSupport.java:94)

at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.getSplit(ParquetRecordReaderWrapper.java:206)

at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.init(ParquetRecordReaderWrapper.java:81)

at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.init(ParquetRecordReaderWrapper.java:67)

at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:51)

at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:197)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:188)

at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:97)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)

at org.apache.spark.scheduler.Task.run(Task.scala:54)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)


The number of columns in my table, pqt_segcust_snappy, has 21 columns and two 
partitions defined. Does this error look familiar to anyone? Could my usage of 
SparkSQL with Hive be incorrect or is support with Hive/Parquet/partitioning 
still buggy at this point in Spark 1.1.0?


Thanks,

-Terry