Re: Kryo serialization mismatch in spark sql windowing function

2016-04-06 Thread Soam Acharya
1.1.jar,/opt/hive/lib/hive-accumulo-handler-1.2.0.jar,/opt/hive/lib/ant-launcher-1.9.1.jar,/opt/hive/lib/hive-jdbc-1.2.0.jar,/opt/hive/lib/commons-compress-1.4.1.jar,/opt/hive/lib/commons-logging-1.1.3.jar,/opt/hive/lib/hive-serde-1.2.0.jar,/opt/hive/lib/zookeeper-3.4.6.jar,/opt/hive/lib/accumulo-start-1.6.0.jar,/opt/hive/lib/hive-contrib-1.2.0.jar,/opt/hive/lib/log4j-1.2.16.jar,/opt/hive/lib/commons-compiler-2.7.6.jar,/opt/hive/lib/ST4-4.0.4.jar,/opt/hive/lib/calcite-avatica-1.2.0-incubating.jar,/opt/hive/lib/httpclient-4.4.jar,/opt/hive/lib/commons-codec-1.4.jar,/opt/hive/lib/commons-io-2.4.jar,/opt/hive/lib/commons-digester-1.8.jar,/opt/hive/lib/regexp-1.3.jar,/opt/hive/lib/ivy-2.4.0.jar,/opt/hive/lib/eigenbase-properties-1.1.5.jar,/opt/hive/lib/paranamer-2.3.jar,/opt/hive/lib/mail-1.4.1.jar,/opt/hive/lib/asm-commons-3.1.jar,/opt/hive/lib/commons-lang-2.6.jar,/opt/hive/lib/hive-jdbc-1.2.0-standalone.jar,/opt/hive/lib/hive-shims-common-1.2.0.jar,/opt/hive/lib/hamcrest-core-1.1.jar,/opt/hive/lib/super-csv-2.2.0.jar,
  spark.history.ui.port -> 18080
  spark.fileserver.port -> 45090
  spark.history.retainedApplications -> 999
  spark.ui.port -> 45100
  spark.shuffle.consolidateFiles -> true
  spark.executor.extraJavaOptions -> -XX:+PrintReferenceGC -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-Djava.library.path=/opt/hadoop/lib/native/
  spark.history.fs.logDirectory -> hdfs:///logs/spark-history
  spark.eventLog.dir -> hdfs:///logs/spark-history/alti_soam/
  spark.executor.extraClassPath ->
spark-hive_2.10-1.6.1.jar:spark-hive-thriftserver_2.10-1.6.1.jar
  spark.driver.port -> 45055
  spark.port.maxRetries -> 999
  spark.executor.port -> 45250
  spark.driver.extraClassPath -> ...



On Wed, Apr 6, 2016 at 6:59 PM, Josh Rosen  wrote:

>
> Spark is compiled against a custom fork of Hive 1.2.1 which added shading
> of Protobuf and removed shading of Kryo. What I think that what's happening
> here is that stock Hive 1.2.1 is taking precedence so the Kryo instance
> that it's returning is an instance of shaded/relocated Hive version rather
> than the unshaded, stock Kryo that Spark is expecting here.
>
> I just so happen to have a patch which reintroduces the shading of Kryo
> (motivated by other factors): https://github.com/apache/spark/pull/12215;
> there's a chance that a backport of this patch might fix this problem.
>
> However, I'm a bit curious about how your classpath is set up and why
> stock 1.2.1's shaded Kryo is being used here.
>
> /cc +Marcelo Vanzin  and +Steve Loughran
> , who may know more.
>
> On Wed, Apr 6, 2016 at 6:08 PM Soam Acharya  wrote:
>
>> Hi folks,
>>
>> I have a build of Spark 1.6.1 on which spark sql seems to be functional
>> outside of windowing functions. For example, I can create a simple external
>> table via Hive:
>>
>> CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd
>> string)
>> ROW FORMAT DELIMITED
>> FIELDS TERMINATED BY ','
>> LINES TERMINATED BY '\n'
>> STORED AS TEXTFILE
>> LOCATION '/user/test/ps';
>>
>> Ensure that the table is pointing to some valid data, set up spark sql to
>> point to the Hive metastore (we're running Hive 1.2.1) and run a basic test:
>>
>> spark-sql> select * from PSTable;
>> 7239pts/0   00:24:31java
>> 9993pts/9   00:00:00ps
>> 9994pts/9   00:00:00tail
>> 9995pts/9   00:00:00sed
>> 9996pts/9   00:00:00sed
>>
>> But when I try to run a windowing function which I know runs onHive, I
>> get:
>>
>> spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by
>> a.cmd order by a.time ) from PSTable a;
>> org.apache.spark.SparkException: Task not serializable
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>> at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>> at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
>> :
>> :
>> Caused by: java.lang.ClassCastException:
>> org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to
>> com.esotericsoftware.kryo.Kryo
>> at
>> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178)
>> at
>> org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191)
>> at
>> java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>
>> Any thoughts or ideas would be appreciated!
>>
>> Regards,
>>
>> Soam
>>
>


Kryo serialization mismatch in spark sql windowing function

2016-04-06 Thread Soam Acharya
Hi folks,

I have a build of Spark 1.6.1 on which spark sql seems to be functional
outside of windowing functions. For example, I can create a simple external
table via Hive:

CREATE EXTERNAL TABLE PSTable (pid int, tty string, time string, cmd string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/test/ps';

Ensure that the table is pointing to some valid data, set up spark sql to
point to the Hive metastore (we're running Hive 1.2.1) and run a basic test:

spark-sql> select * from PSTable;
7239pts/0   00:24:31java
9993pts/9   00:00:00ps
9994pts/9   00:00:00tail
9995pts/9   00:00:00sed
9996pts/9   00:00:00sed

But when I try to run a windowing function which I know runs onHive, I get:

spark-sql> select a.pid ,a.time, a.cmd, min(a.time) over (partition by
a.cmd order by a.time ) from PSTable a;
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
:
:
Caused by: java.lang.ClassCastException:
org.apache.hive.com.esotericsoftware.kryo.Kryo cannot be cast to
com.esotericsoftware.kryo.Kryo
at
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.serializePlan(HiveShim.scala:178)
at
org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.writeExternal(HiveShim.scala:191)
at
java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

Any thoughts or ideas would be appreciated!

Regards,

Soam


sqlContext.cacheTable + yarn client mode

2016-03-30 Thread Soam Acharya
Hi folks,

I understand that invoking sqlContext.cacheTable("tableName") will load the
table into a compressed in-memory columnar format. When Spark is launched
via spark shell in YARN client mode, is the table loaded into the local
Spark driver process in addition to the executors in the Hadoop cluster or
is it just loaded into the executors? We're exploring an OOM issue on the
local Spark driver for some SQL code and was wondering if the local cache
load could be the culprit.

Appreciate any thoughts. BTW, we're running Spark 1.6.0 on this particular
cluster.

Regards,

Soam