SparkSQL to read XML Blob data to create multiple rows

2017-06-28 Thread Talap, Amol
Hi:

We are trying to parse XML data to get below output from given input sample.
Can someone suggest a way to pass one DFrames output into load() function or 
any other alternative to get this output.

Input Data from Oracle Table XMLBlob:
SequenceID

Name

City

XMLComment

1

Amol

Kolhapur

Title1.1Description_1.1Title1.2Description_1.2Title1.3Description_1.3

2

Suresh

Mumbai

Title2Description_2

3

Vishal

Delhi

Title3Description_3

4

Swastik

Bangalore

Title4Description_4


Output Data Expected using Spark SQL:
SequenceID

Name

City

Title

Description

1

Amol

Kolhapur

Title1.1

Description_1.1

1

Amol

Kolhapur

Title1.1

Description_1.2

1

Amol

Kolhapur

Title1.3

Description_1.3

2

Suresh

Mumbai

Title2

Description_2

3

Vishal

Delhi

Title3.1

Description_3.1

4

Swastik

Bangalore

Title4

Description_4


I am able to parse single XML using below approach in spark-shell using example 
below but how do we apply the same recursively for all rows ?
https://community.hortonworks.com/questions/71538/parsing-xml-in-spark-rdd.html.

val dfX = 
sqlContext.read.format("com.databricks.spark.xml").option("rowTag","book").load("books.xml")

val xData = dfX.registerTempTable("books")

dfX.printSchema()

val books_inexp =sqlContext.sql("select title,author from books where price<10")

books_inexp.show


Regards,
Amol
This message contains information that may be privileged or confidential and is 
the property of the Capgemini Group. It is intended only for the person to whom 
it is addressed. If you are not the intended recipient, you are not authorized 
to read, print, retain, copy, disseminate, distribute, or use this message or 
any part thereof. If you receive this message in error, please notify the 
sender immediately and delete all copies of this message.


Re: Spark Project build Issues.(Intellij)

2017-06-28 Thread satyajit vegesna
Hi ,

I was able to successfully build the project(source code), from intellij.
But when i try to run any of the examples present in $SPARK_HOME/examples
folder , i am getting different errors for different example jobs.

example:
for structuredkafkawordcount example,

Exception in thread "main" java.lang.NoClassDefFoundError:
scala/collection/Seq
at
org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala)
Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 1 more

for LogQuery job,

objc[21879]: Class JavaLaunchHelper is implemented in both
/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/bin/java
(0x106ff54c0) and
/Library/Java/JavaVirtualMachines/jdk1.8.0_131.jdk/Contents/Home/jre/lib/libinstrument.dylib
(0x1070bd4e0). One of the two will be used. Which one is undefined.
Error: A JNI error has occurred, please check your installation and try
again
Exception in thread "main" java.lang.NoClassDefFoundError:
scala/collection/immutable/List
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: scala.collection.immutable.List
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more


On Wed, Jun 28, 2017 at 5:21 PM, Dongjoon Hyun 
wrote:

> Did you follow the guide in `IDE Setup` -> `IntelliJ` section of
> http://spark.apache.org/developer-tools.html ?
>
> Bests,
> Dongjoon.
>
> On Wed, Jun 28, 2017 at 5:13 PM, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Hi All,
>>
>> When i try to build source code of apache spark code from
>> https://github.com/apache/spark.git, i am getting below errors,
>>
>> Error:(9, 14) EventBatch is already defined as object EventBatch
>> public class EventBatch extends org.apache.avro.specific.SpecificRecordBase
>> implements org.apache.avro.specific.SpecificRecord {
>> Error:(9, 14) EventBatch is already defined as class EventBatch
>> public class EventBatch extends org.apache.avro.specific.SpecificRecordBase
>> implements org.apache.avro.specific.SpecificRecord {
>> /Users/svegesna/svegesna/dev/scala/spark/external/flume-sink
>> /target/scala-2.11/src_managed/main/compiled_avro/org/
>> apache/spark/streaming/flume/sink/SparkFlumeProtocol.java
>> Error:(26, 18) SparkFlumeProtocol is already defined as object
>> SparkFlumeProtocol
>> public interface SparkFlumeProtocol {
>> Error:(26, 18) SparkFlumeProtocol is already defined as trait
>> SparkFlumeProtocol
>> public interface SparkFlumeProtocol {
>> /Users/svegesna/svegesna/dev/scala/spark/external/flume-sink
>> /target/scala-2.11/src_managed/main/compiled_avro/org/
>> apache/spark/streaming/flume/sink/SparkSinkEvent.java
>> Error:(9, 14) SparkSinkEvent is already defined as object SparkSinkEvent
>> public class SparkSinkEvent extends 
>> org.apache.avro.specific.SpecificRecordBase
>> implements org.apache.avro.specific.SpecificRecord {
>> Error:(9, 14) SparkSinkEvent is already defined as class SparkSinkEvent
>> public class SparkSinkEvent extends 
>> org.apache.avro.specific.SpecificRecordBase
>> implements org.apache.avro.specific.SpecificRecord {
>>
>> Would like to know , if i can successfully build the project, so that i
>> can test and debug some of spark's functionalities.
>>
>> Regards,
>> Satyajit.
>>
>
>


about broadcast join of base table in spark sql

2017-06-28 Thread paleyl
Hi All,

Recently I meet a problem in broadcast join: I want to left join table A
and B, A is the smaller one and the left table, so I wrote
A = A.join(B,A("key1") === B("key2"),"left")
but I found that A is not broadcast out, as the shuffle size is still very
large.
I guess this is a designed mechanism in spark, so could anyone please tell
me why it is designed like this? I am just very curious.

Best,

Paley


Re: Spark Project build Issues.(Intellij)

2017-06-28 Thread Dongjoon Hyun
Did you follow the guide in `IDE Setup` -> `IntelliJ` section of
http://spark.apache.org/developer-tools.html ?

Bests,
Dongjoon.

On Wed, Jun 28, 2017 at 5:13 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> When i try to build source code of apache spark code from
> https://github.com/apache/spark.git, i am getting below errors,
>
> Error:(9, 14) EventBatch is already defined as object EventBatch
> public class EventBatch extends org.apache.avro.specific.SpecificRecordBase
> implements org.apache.avro.specific.SpecificRecord {
> Error:(9, 14) EventBatch is already defined as class EventBatch
> public class EventBatch extends org.apache.avro.specific.SpecificRecordBase
> implements org.apache.avro.specific.SpecificRecord {
> /Users/svegesna/svegesna/dev/scala/spark/external/flume-
> sink/target/scala-2.11/src_managed/main/compiled_avro/
> org/apache/spark/streaming/flume/sink/SparkFlumeProtocol.java
> Error:(26, 18) SparkFlumeProtocol is already defined as object
> SparkFlumeProtocol
> public interface SparkFlumeProtocol {
> Error:(26, 18) SparkFlumeProtocol is already defined as trait
> SparkFlumeProtocol
> public interface SparkFlumeProtocol {
> /Users/svegesna/svegesna/dev/scala/spark/external/flume-
> sink/target/scala-2.11/src_managed/main/compiled_avro/
> org/apache/spark/streaming/flume/sink/SparkSinkEvent.java
> Error:(9, 14) SparkSinkEvent is already defined as object SparkSinkEvent
> public class SparkSinkEvent extends 
> org.apache.avro.specific.SpecificRecordBase
> implements org.apache.avro.specific.SpecificRecord {
> Error:(9, 14) SparkSinkEvent is already defined as class SparkSinkEvent
> public class SparkSinkEvent extends 
> org.apache.avro.specific.SpecificRecordBase
> implements org.apache.avro.specific.SpecificRecord {
>
> Would like to know , if i can successfully build the project, so that i
> can test and debug some of spark's functionalities.
>
> Regards,
> Satyajit.
>


Spark Project build Issues.(Intellij)

2017-06-28 Thread satyajit vegesna
Hi All,

When i try to build source code of apache spark code from
https://github.com/apache/spark.git, i am getting below errors,

Error:(9, 14) EventBatch is already defined as object EventBatch
public class EventBatch extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
Error:(9, 14) EventBatch is already defined as class EventBatch
public class EventBatch extends org.apache.avro.specific.SpecificRecordBase
implements org.apache.avro.specific.SpecificRecord {
/Users/svegesna/svegesna/dev/scala/spark/external/flume-sink/target/scala-2.11/src_managed/main/compiled_avro/org/apache/spark/streaming/flume/sink/SparkFlumeProtocol.java
Error:(26, 18) SparkFlumeProtocol is already defined as object
SparkFlumeProtocol
public interface SparkFlumeProtocol {
Error:(26, 18) SparkFlumeProtocol is already defined as trait
SparkFlumeProtocol
public interface SparkFlumeProtocol {
/Users/svegesna/svegesna/dev/scala/spark/external/flume-sink/target/scala-2.11/src_managed/main/compiled_avro/org/apache/spark/streaming/flume/sink/SparkSinkEvent.java
Error:(9, 14) SparkSinkEvent is already defined as object SparkSinkEvent
public class SparkSinkEvent extends
org.apache.avro.specific.SpecificRecordBase implements
org.apache.avro.specific.SpecificRecord {
Error:(9, 14) SparkSinkEvent is already defined as class SparkSinkEvent
public class SparkSinkEvent extends
org.apache.avro.specific.SpecificRecordBase implements
org.apache.avro.specific.SpecificRecord {

Would like to know , if i can successfully build the project, so that i can
test and debug some of spark's functionalities.

Regards,
Satyajit.


Re: Structured Streaming Questions

2017-06-28 Thread Tathagata Das
Answers inline.



On Wed, Jun 28, 2017 at 10:27 AM, Revin Chalil  wrote:

> I am using Structured Streaming with Spark 2.1 and have some basic
> questions.
>
>
>
> · Is there a way to automatically refresh the Hive Partitions
> when using Parquet Sink with Partition? My query looks like below
>
>
>
> *val *queryCount = windowedCount
>   .withColumn("hive_partition_persist_date",
> $"persist_date_window_start".cast("date"))
>   .writeStream.format("parquet")
>   .partitionBy("hive_partition_persist_date")
>   .option("path", StatsDestination)
>   .option("checkpointLocation", CheckPointLocationStats)
>   .trigger(*ProcessingTime*(WindowDurationStats))
>   .outputMode("append")
>   .start()
>
>
>
> I have an external Parquet table built on top of Destination Dir. Above
> query creates the Partition Dirs but the Hive partition metadata is not
> refreshed and I have to execute  ALTER TABLE …. RECOVER PARTITIONS,
> before querying the Hive table. With legacy Streaming, it was possible to
> use the spark.sql(hiveQL), where hiveQL can be any hive statements, config
> settings etc:. Would this kind of functionality be available in structured
> streaming?
>
>
>
> · The Query creates an additional dir “_spark_metadata
> ” under the Destination dir and this
> causes the select statements against the Parquet table fail as it is
> expecting only the parquet files under the destination location. Is there a
> config to avoid the creation of this dir?
>
> The _spark_metadata directory hold the metadata information (a
write-ahead-log) of which files in the directory are the correct complete
files generated by the streaming query. This is how we actually get exactly
once guarantees when writing to a file system. So this directory is very
important. In fact, if you want to run queries on this directory, Spark is
aware of this metadata and will read the correct files, and ignore
temporary/incomplete files that may have been generated by failed/duplicate
tasks. Querying that directory from Hive may lead to duplicate data as it
will read those temp/duplicate files as well.

If you want the data to be available to Hive, you could periodically run a
Spark job to read the files from the directory and write out to a hive
table.


>
>
> · Our use-case does not need to account for late-arriving records
> and so I have set the WaterMark as 0 seconds. Is this needed to flush out
> the data or is that a default setting or is this inappropriate?
>
> This should be fine.


>
>
> · In the “Append” mode, I have to push at least 3 batches to
> actually see the records written to the Destination, even with the
> watermark = 0 seconds setting. I understand that the statestore has to wait
> for watermark to output records but here watermark is 0 seconds. Curious to
> know what exactly is happening behind the scenes…
>
> Probably this is what is going on
1st batch - No estimate of max event-time to begin with, watermark value
not set
2nd batch - Watermark is set based on the max event-time seen in 1st batch.
Say it is W. Windows (i.e. [start,end]) for the earlier data received in
first batch is such that window.start < W < window.end. So the window is
still open and not finalized.
3rd batch - Watermark updated to W1 such that earliestWindow.end < W, and
therefore the corresponding W is finalized.

In general, in aggregation + watermark + append mode, you have to wait at
least (window duration + watermark gap) before the earliest window expires
and emits the finalized aggregates. This is visually shown here.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking


>
> · The “Trigger” duration and “Window” duration in our case are
> the same as we need to just get the count for every batch. Is a “Window”
> really needed in this scenario as I can logically get the batch count by
> just using count? I tried to just get the count from the batch and it said,
> aggregation cannot be done on streaming dataframe in the Append Mode.
>
count() in a streaming DataFrame is not a batch count but a count on all
the data in the stream. As this data arrives, this count is updated by the
streaming query, and depending on the output mode, the updated/finalized
values are written out by the sink.

In general, the whole design of Structured Streaming is that you should
specify your query as if you are doing on a table, and the execution model
(batches, or otherwise) should not factor in into the query semantics. For
example, when writing SQL queries in MySQL, we only care about the query
semantics, and dont care about how it is going to be executed. So the
concept of "batch" does not exist in the streaming DataFrame APIs, and
queries written in this API will be executable either in 

Re: Building Kafka 0.10 Source for Structured Streaming Error.

2017-06-28 Thread ayan guha
--jars does not do wildcard expansion. List out the jars as comma separated.

On Thu, 29 Jun 2017 at 5:17 am, satyajit vegesna 
wrote:

> Have updated the pom.xml in external/kafka-0-10-sql folder, in yellow , as
> below, and have run the command
> build/mvn package -DskipTests -pl external/kafka-0-10-sql
> which generated
> spark-sql-kafka-0-10_2.11-2.3.0-SNAPSHOT-jar-with-dependencies.jar
>
> 
>
> 
>
>
> http://maven.apache.org/POM/4.0.0; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="
> http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd;>
>
>   4.0.0
>
>   
>
> org.apache.spark
>
> spark-parent_2.11
>
> 2.3.0-SNAPSHOT
>
> ../../pom.xml
>
>   
>
>
>   org.apache.spark
>
>   spark-sql-kafka-0-10_2.11
>
>   
>
> sql-kafka-0-10
>
>   
>
>   jar
>
>   Kafka 0.10 Source for Structured Streaming
>
>   http://spark.apache.org/
>
>
>   
>
> 
>
>   org.apache.spark
>
>   spark-sql_${scala.binary.version}
>
>   ${project.version}
>
>   provided
>
> 
>
> 
>
>   org.apache.spark
>
>   spark-core_${scala.binary.version}
>
>   ${project.version}
>
>   test-jar
>
>   test
>
> 
>
> 
>
>   org.apache.spark
>
>   spark-catalyst_${scala.binary.version}
>
>   ${project.version}
>
>   test-jar
>
>   test
>
> 
>
> 
>
>   org.apache.spark
>
>   spark-sql_${scala.binary.version}
>
>   ${project.version}
>
>   test-jar
>
>   test
>
> 
>
> 
>
>   org.apache.kafka
>
>   kafka-clients
>
>   0.10.0.1
>
> 
>
> 
>
>   org.apache.kafka
>
>   kafka_${scala.binary.version}
>
>   0.10.0.1
>
> 
>
> 
>
>   net.sf.jopt-simple
>
>   jopt-simple
>
>   3.2
>
>   test
>
> 
>
> 
>
>   org.scalacheck
>
>   scalacheck_${scala.binary.version}
>
>   test
>
> 
>
> 
>
>   org.apache.spark
>
>   spark-tags_${scala.binary.version}
>
> 
>
>
> 
>
> 
>
>   org.apache.spark
>
>   spark-tags_${scala.binary.version}
>
>   test-jar
>
>   test
>
> 
>
>
>   
>
>   
>
>
> target/scala-${scala.binary.version}/classes
>
>
> target/scala-${scala.binary.version}/test-classes
>
> 
>
>   
>
> maven-assembly-plugin
>
> 3.0.0
>
> 
>
>   
>
> jar-with-dependencies
>
>   
>
> 
>
> 
>
>   
>
> make-assembly 
>
> package 
>
> 
>
>   single
>
> 
>
>   
>
> 
>
>   
>
>  
>
>   
>
> 
>
>
> Regards,
>
> Satyajit.
>
> On Wed, Jun 28, 2017 at 12:12 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> "--package" will add transitive dependencies that are not
>> "$SPARK_HOME/external/kafka-0-10-sql/target/*.jar".
>>
>> > i have tried building the jar with dependencies, but still face the
>> same error.
>>
>> What's the command you used?
>>
>> On Wed, Jun 28, 2017 at 12:00 PM, satyajit vegesna <
>> satyajit.apas...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am trying too build Kafka-0-10-sql module under external folder in
>>> apache spark source code.
>>> Once i generate jar file using,
>>> build/mvn package -DskipTests -pl external/kafka-0-10-sql
>>> i get jar file created under external/kafka-0-10-sql/target.
>>>
>>> And try to run spark-shell with jars created in target folder as below,
>>> bin/spark-shell --jars $SPARK_HOME/external/kafka-0-10-sql/target/*.jar
>>>
>>> i get below error based on the command,
>>>
>>> Using Spark's default log4j profile:
>>> org/apache/spark/log4j-defaults.properties
>>>
>>> Setting default log level to "WARN".
>>>
>>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>>> setLogLevel(newLevel).
>>>
>>> 17/06/28 11:54:03 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>>
>>> Spark context Web UI available at http://10.1.10.241:4040
>>>
>>> Spark context available as 'sc' (master = local[*], app id =
>>> local-1498676043936).
>>>
>>> Spark session available as 'spark'.
>>>
>>> Welcome to
>>>
>>>     __
>>>
>>>  / __/__  ___ _/ /__
>>>
>>> _\ \/ _ \/ _ `/ __/  '_/
>>>
>>>/___/ .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
>>>
>>>   /_/
>>>
>>>
>>>
>>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>>> 1.8.0_131)
>>>
>>> Type in expressions to have them evaluated.
>>>
>>> Type :help for more information.
>>>
>>> scala> val lines =
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers",
>>> "localhost:9092").option("subscribe", "test").load()
>>>
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>>>
>>>   at
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:378)
>>>
>>>   at
>>> 

Re: Building Kafka 0.10 Source for Structured Streaming Error.

2017-06-28 Thread satyajit vegesna
Have updated the pom.xml in external/kafka-0-10-sql folder, in yellow , as
below, and have run the command
build/mvn package -DskipTests -pl external/kafka-0-10-sql
which generated
spark-sql-kafka-0-10_2.11-2.3.0-SNAPSHOT-jar-with-dependencies.jar






http://maven.apache.org/POM/4.0.0; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>

  4.0.0

  

org.apache.spark

spark-parent_2.11

2.3.0-SNAPSHOT

../../pom.xml

  


  org.apache.spark

  spark-sql-kafka-0-10_2.11

  

sql-kafka-0-10

  

  jar

  Kafka 0.10 Source for Structured Streaming

  http://spark.apache.org/


  



  org.apache.spark

  spark-sql_${scala.binary.version}

  ${project.version}

  provided





  org.apache.spark

  spark-core_${scala.binary.version}

  ${project.version}

  test-jar

  test





  org.apache.spark

  spark-catalyst_${scala.binary.version}

  ${project.version}

  test-jar

  test





  org.apache.spark

  spark-sql_${scala.binary.version}

  ${project.version}

  test-jar

  test





  org.apache.kafka

  kafka-clients

  0.10.0.1





  org.apache.kafka

  kafka_${scala.binary.version}

  0.10.0.1





  net.sf.jopt-simple

  jopt-simple

  3.2

  test





  org.scalacheck

  scalacheck_${scala.binary.version}

  test





  org.apache.spark

  spark-tags_${scala.binary.version}








  org.apache.spark

  spark-tags_${scala.binary.version}

  test-jar

  test




  

  


target/scala-${scala.binary.version}/classes


target/scala-${scala.binary.version}/test-classes



  

maven-assembly-plugin

3.0.0



  

jar-with-dependencies

  





  

make-assembly 

package 



  single



  



  

 

  




Regards,

Satyajit.

On Wed, Jun 28, 2017 at 12:12 PM, Shixiong(Ryan) Zhu <
shixi...@databricks.com> wrote:

> "--package" will add transitive dependencies that are not
> "$SPARK_HOME/external/kafka-0-10-sql/target/*.jar".
>
> > i have tried building the jar with dependencies, but still face the same
> error.
>
> What's the command you used?
>
> On Wed, Jun 28, 2017 at 12:00 PM, satyajit vegesna <
> satyajit.apas...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am trying too build Kafka-0-10-sql module under external folder in
>> apache spark source code.
>> Once i generate jar file using,
>> build/mvn package -DskipTests -pl external/kafka-0-10-sql
>> i get jar file created under external/kafka-0-10-sql/target.
>>
>> And try to run spark-shell with jars created in target folder as below,
>> bin/spark-shell --jars $SPARK_HOME/external/kafka-0-10-sql/target/*.jar
>>
>> i get below error based on the command,
>>
>> Using Spark's default log4j profile: org/apache/spark/log4j-default
>> s.properties
>>
>> Setting default log level to "WARN".
>>
>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>> setLogLevel(newLevel).
>>
>> 17/06/28 11:54:03 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>>
>> Spark context Web UI available at http://10.1.10.241:4040
>>
>> Spark context available as 'sc' (master = local[*], app id =
>> local-1498676043936).
>>
>> Spark session available as 'spark'.
>>
>> Welcome to
>>
>>     __
>>
>>  / __/__  ___ _/ /__
>>
>> _\ \/ _ \/ _ `/ __/  '_/
>>
>>/___/ .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
>>
>>   /_/
>>
>>
>>
>> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.8.0_131)
>>
>> Type in expressions to have them evaluated.
>>
>> Type :help for more information.
>>
>> scala> val lines = spark.readStream.format("kafka
>> ").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe",
>> "test").load()
>>
>> java.lang.NoClassDefFoundError: org/apache/kafka/common/serial
>> ization/ByteArrayDeserializer
>>
>>   at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(Ka
>> fkaSourceProvider.scala:378)
>>
>>   at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(
>> KafkaSourceProvider.scala)
>>
>>   at org.apache.spark.sql.kafka010.KafkaSourceProvider.validateSt
>> reamOptions(KafkaSourceProvider.scala:325)
>>
>>   at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSche
>> ma(KafkaSourceProvider.scala:60)
>>
>>   at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceSchema(DataSource.scala:192)
>>
>>   at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceInfo$lzycompute(DataSource.scala:87)
>>
>>   at 

Re: Building Kafka 0.10 Source for Structured Streaming Error.

2017-06-28 Thread Shixiong(Ryan) Zhu
"--package" will add transitive dependencies that are not
"$SPARK_HOME/external/kafka-0-10-sql/target/*.jar".

> i have tried building the jar with dependencies, but still face the same
error.

What's the command you used?

On Wed, Jun 28, 2017 at 12:00 PM, satyajit vegesna <
satyajit.apas...@gmail.com> wrote:

> Hi All,
>
> I am trying too build Kafka-0-10-sql module under external folder in
> apache spark source code.
> Once i generate jar file using,
> build/mvn package -DskipTests -pl external/kafka-0-10-sql
> i get jar file created under external/kafka-0-10-sql/target.
>
> And try to run spark-shell with jars created in target folder as below,
> bin/spark-shell --jars $SPARK_HOME/external/kafka-0-10-sql/target/*.jar
>
> i get below error based on the command,
>
> Using Spark's default log4j profile: org/apache/spark/log4j-
> defaults.properties
>
> Setting default log level to "WARN".
>
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
> setLogLevel(newLevel).
>
> 17/06/28 11:54:03 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> Spark context Web UI available at http://10.1.10.241:4040
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1498676043936).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
>
>   /_/
>
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_131)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
> scala> val lines = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers",
> "localhost:9092").option("subscribe", "test").load()
>
> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/
> ByteArrayDeserializer
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(
> KafkaSourceProvider.scala:378)
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider$.(
> KafkaSourceProvider.scala)
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider.
> validateStreamOptions(KafkaSourceProvider.scala:325)
>
>   at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(
> KafkaSourceProvider.scala:60)
>
>   at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
> DataSource.scala:192)
>
>   at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
> lzycompute(DataSource.scala:87)
>
>   at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
> DataSource.scala:87)
>
>   at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
> StreamingRelation.scala:30)
>
>   at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:150)
>
>   ... 48 elided
>
> Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.
> serialization.ByteArrayDeserializer
>
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>   ... 57 more
>
> ++
>
> i have tried building the jar with dependencies, but still face the same
> error.
>
> But when i try to do --package with spark-shell using bin/spark-shell
> --package org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 , it works
> fine.
>
> The reason, i am trying to build something from source code, is because i
> want to try pushing dataframe data into kafka topic, based on the url
> https://github.com/apache/spark/commit/b0a5cd89097c563e9949d8cfcf84d1
> 8b03b8d24c, which doesn't work with version 2.1.0.
>
>
> Any help would be highly appreciated.
>
>
> Regards,
>
> Satyajit.
>
>
>


Building Kafka 0.10 Source for Structured Streaming Error.

2017-06-28 Thread satyajit vegesna
Hi All,

I am trying too build Kafka-0-10-sql module under external folder in apache
spark source code.
Once i generate jar file using,
build/mvn package -DskipTests -pl external/kafka-0-10-sql
i get jar file created under external/kafka-0-10-sql/target.

And try to run spark-shell with jars created in target folder as below,
bin/spark-shell --jars $SPARK_HOME/external/kafka-0-10-sql/target/*.jar

i get below error based on the command,

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).

17/06/28 11:54:03 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable

Spark context Web UI available at http://10.1.10.241:4040

Spark context available as 'sc' (master = local[*], app id =
local-1498676043936).

Spark session available as 'spark'.

Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT

  /_/



Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
1.8.0_131)

Type in expressions to have them evaluated.

Type :help for more information.

scala> val lines =
spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test").load()

java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer

  at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala:378)

  at
org.apache.spark.sql.kafka010.KafkaSourceProvider$.(KafkaSourceProvider.scala)

  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:325)

  at
org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:60)

  at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192)

  at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)

  at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)

  at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)

  at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)

  ... 48 elided

Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.serialization.ByteArrayDeserializer

  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)

  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

  ... 57 more

++

i have tried building the jar with dependencies, but still face the same
error.

But when i try to do --package with spark-shell using bin/spark-shell
--package org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 , it works fine.

The reason, i am trying to build something from source code, is because i
want to try pushing dataframe data into kafka topic, based on the url
https://github.com/apache/spark/commit/b0a5cd89097c563e9949d8cfcf84d18b03b8d24c,
which doesn't work with version 2.1.0.


Any help would be highly appreciated.


Regards,

Satyajit.


Re: Spark job profiler results showing high TCP cpu time

2017-06-28 Thread Reth RM
I am using visual vm:  https://github.com/krasa/VisualVMLauncher

@Marcelo, thank you for the reply, that was helpful.


On Fri, Jun 23, 2017 at 12:48 PM, Eduardo Mello 
wrote:

> what program do u use to profile Spark?
>
> On Fri, Jun 23, 2017 at 3:07 PM, Marcelo Vanzin 
> wrote:
>
>> That thread looks like the connection between the Spark process and
>> jvisualvm. It's expected to show high up when doing sampling if the
>> app is not doing much else.
>>
>> On Fri, Jun 23, 2017 at 10:46 AM, Reth RM  wrote:
>> > Running a spark job on local machine and profiler results indicate that
>> > highest time spent in sun.rmi.transport.tcp.TCPTrans
>> port$ConnectionHandler.
>> > Screenshot of profiler result can be seen here : https://jpst.it/10i-V
>> >
>> > Spark job(program) is performing IO (sc.wholeTextFile method of spark
>> apis),
>> > Reads files from local file system and analyses the text to obtain
>> tokens.
>> >
>> > Any thoughts and suggestions?
>> >
>> > Thanks.
>> >
>>
>>
>>
>> --
>> Marcelo
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Structured Streaming Questions

2017-06-28 Thread Revin Chalil
I am using Structured Streaming with Spark 2.1 and have some basic questions.


* Is there a way to automatically refresh the Hive Partitions when 
using Parquet Sink with Partition? My query looks like below


val queryCount = windowedCount
  .withColumn("hive_partition_persist_date", 
$"persist_date_window_start".cast("date"))
  .writeStream.format("parquet")
  .partitionBy("hive_partition_persist_date")
  .option("path", StatsDestination)
  .option("checkpointLocation", CheckPointLocationStats)
  .trigger(ProcessingTime(WindowDurationStats))
  .outputMode("append")
  .start()


I have an external Parquet table built on top of Destination Dir. Above query 
creates the Partition Dirs but the Hive partition metadata is not refreshed and 
I have to execute  ALTER TABLE  RECOVER PARTITIONS, before querying the 
Hive table. With legacy Streaming, it was possible to use the 
spark.sql(hiveQL), where hiveQL can be any hive statements, config settings 
etc:. Would this kind of functionality be available in structured streaming?


* The Query creates an additional dir 
"_spark_metadata" under the Destination dir 
and this causes the select statements against the Parquet table fail as it is 
expecting only the parquet files under the destination location. Is there a 
config to avoid the creation of this dir?

[cid:image002.jpg@01D2EFF9.184FE4F0]



* Our use-case does not need to account for late-arriving records and 
so I have set the WaterMark as 0 seconds. Is this needed to flush out the data 
or is that a default setting or is this inappropriate?


* In the "Append" mode, I have to push at least 3 batches to actually 
see the records written to the Destination, even with the watermark = 0 seconds 
setting. I understand that the statestore has to wait for watermark to output 
records but here watermark is 0 seconds. Curious to know what exactly is 
happening behind the scenes...



* The "Trigger" duration and "Window" duration in our case are the same 
as we need to just get the count for every batch. Is a "Window" really needed 
in this scenario as I can logically get the batch count by just using count? I 
tried to just get the count from the batch and it said, aggregation cannot be 
done on streaming dataframe in the Append Mode.


* In our current code, we use the DataBricks' Spark-Redshift library to 
write output to Redshift. Would this library be available in Structured 
Streaming? Is there a way to do this using the "ForEach"?


* With Legacy streaming, we checkpoint the Kafka Offsets in ZooKeeper. 
Is using Structured Streaming's checkpointing resilient enough to handle all 
the failure-restart scenarios?



* When would the spark 2.2 available for use? I see that the 
programming guide still says 2.1.1.


Thanks,
Revin


Re: IDE for python

2017-06-28 Thread Xiaomeng Wan
Thanks for all of you. I will give Pycharm a try.

Regards,
Shawn

On 28 June 2017 at 06:07, Sotola, Radim  wrote:

> I know. But I pay around 20Euro per month for all products from JetBrains
> and I think this is not so much – I Czech it is one evening in pub.
>
>
>
> *From:* Md. Rezaul Karim [mailto:rezaul.ka...@insight-centre.org]
> *Sent:* Wednesday, June 28, 2017 12:55 PM
> *To:* Sotola, Radim 
> *Cc:* spark users ; ayan guha ;
> Abhinay Mehta ; Xiaomeng Wan 
> *Subject:* RE: IDE for python
>
>
>
> By the way, Pycharm from JetBrians also have a community edition which is
> free and open source.
>
>
>
> Moreover, if you are a student, you can use the professional edition for
> students as well.
>
>
>
> For more, see here https://www.jetbrains.com/student/
>
>
>
> On Jun 28, 2017 11:18 AM, "Sotola, Radim" 
> wrote:
>
> Pycharm is good choice. I buy monthly subscription and can see that the
> PyCharm development continue  (I mean that this is not tool which somebody
> develop and leave it without any upgrades).
>
>
>
> *From:* Abhinay Mehta [mailto:abhinay.me...@gmail.com]
> *Sent:* Wednesday, June 28, 2017 11:06 AM
> *To:* ayan guha 
> *Cc:* User ; Xiaomeng Wan 
> *Subject:* Re: IDE for python
>
>
>
> I use Pycharm and it works a treat. The big advantage I find is that I can
> use the same command shortcuts that I do when developing with IntelliJ IDEA
> when doing Scala or Java.
>
>
>
>
>
> On 27 June 2017 at 23:29, ayan guha  wrote:
>
> Depends on the need. For data exploration, i use notebooks whenever I can.
> For developement, any good text editor should work, I use sublime. If you
> want auto completion and all, you can use eclipse or pycharm, I do not :)
>
>
>
> On Wed, 28 Jun 2017 at 7:17 am, Xiaomeng Wan  wrote:
>
> Hi,
>
> I recently switched from scala to python, and wondered which IDE people
> are using for python. I heard about pycharm, spyder etc. How do they
> compare with each other?
>
>
>
> Thanks,
>
> Shawn
>
> --
>
> Best Regards,
> Ayan Guha
>
>
>
>


Re: What is the equivalent of mapPartitions in SpqrkSQL?

2017-06-28 Thread jeff saremi
I have to read up on the writer. But would the writer get records back from 
somewhere? I want to do a bulk operation and continue with the results in the 
form of a dataframe.

Currently the UDF does this: 1 scalar -> 1 scalar

the UDAF does this: M records -> 1 scalar

I want this: M records -> M records (or M scalars)
or in the broadest sense: M records -> N records

I think this capability is left out of sparksql forcing us to go back to spark 
core using map*, groupby*, and reduceby* functions and alike

Being forced to keep converting between sql and non-sql is very annoying as 
such forcing us to stay conservative and just make do without sql. I'm sure 
we're not alone here.



From: Aaron Perrin 
Sent: Tuesday, June 27, 2017 4:50:25 PM
To: Ryan; jeff saremi
Cc: user@spark.apache.org
Subject: Re: What is the equivalent of mapPartitions in SpqrkSQL?

I'm assuming some things here, but hopefully I understand. So, basically you 
have a big table of data distributed across a bunch of executors. And, you want 
an efficient way to call a native method for each row.

It sounds similar to a dataframe writer to me. Except, instead of writing to 
disk or network, you're 'writing' to a native function. Would a custom 
dataframe writer work? That's what I'd try first.

If that doesn't work for your case, you could also try adding a column where 
the column function does the native call. However, if doing it that way, you'd 
have to ensure that the column function actually gets called for all rows. (An 
interesting side effect of that is that you could JNI/WinAPI errors there and 
set the column value to the result.)

There are other ways, too, if those options don't work...

On Sun, Jun 25, 2017 at 8:07 PM jeff saremi 
> wrote:

My specific and immediate need is this: We have a native function wrapped in 
JNI. To increase performance we'd like to avoid calling it record by record. 
mapPartitions() give us the ability to invoke this in bulk. We're looking for a 
similar approach in SQL.



From: Ryan >
Sent: Sunday, June 25, 2017 7:18:32 PM
To: jeff saremi
Cc: user@spark.apache.org
Subject: Re: What is the equivalent of mapPartitions in SpqrkSQL?

Why would you like to do so? I think there's no need for us to explicitly ask 
for a forEachPartition in spark sql because tungsten is smart enough to figure 
out whether a sql operation could be applied on each partition or there has to 
be a shuffle.

On Sun, Jun 25, 2017 at 11:32 PM, jeff saremi 
> wrote:

You can do a map() using a select and functions/UDFs. But how do you process a 
partition using SQL?




using Apache Spark standalone on a server for a class/multiple users, db.lck does not get removed

2017-06-28 Thread Robert Kudyba
We have a Big Data class planned and we’d like students to be able to start 
spark-shell or pyspark as their own user. However the Derby database locks the 
process from starting as another user:

-rw-r--r-- 1 myuser staff   38 Jun 28 10:40 db.lck

And these errors appear:

ERROR PoolWatchThread: Error in trying to obtain a connection. Retrying in 
7000ms
java.sql.SQLException: A read-only user or a user in a read-only database is 
not permitted to disable read-only mode on a connection.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown 
Source)
at 
org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedConnection.setReadOnly(Unknown 
Source)
at 
com.jolbox.bonecp.ConnectionHandle.setReadOnly(ConnectionHandle.java:1324)
at com.jolbox.bonecp.ConnectionHandle.(ConnectionHandle.java:262)
at 
com.jolbox.bonecp.PoolWatchThread.fillConnections(PoolWatchThread.java:115)
at com.jolbox.bonecp.PoolWatchThread.run(PoolWatchThread.java:82)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: ERROR 25505: A read-only user or a user in a read-only database is 
not permitted to disable read-only mode on a connection.
at org.apache.derby.iapi.error.StandardException.newException(Unknown 
Source)
at org.apache.derby.iapi.error.StandardException.newException(Unknown 
Source)
at 
org.apache.derby.impl.sql.conn.GenericAuthorizer.setReadOnlyConnection(Unknown 
Source)
at 
org.apache.derby.impl.sql.conn.GenericLanguageConnectionContext.setReadOnly(Unknown
 Source)

Is there a work around or best practice for this scenario?

Re: PySpark 2.1.1 Can't Save Model - Permission Denied

2017-06-28 Thread Yanbo Liang
It looks like your Spark job was running under user root, but you file
system operation was running under user jomernik. Since Spark will call
corresponding file system(such as HDFS, S3) to commit job(rename temporary
file to persistent one), it should have correct authorization for both
Spark and file system. Could you write a Spark DataFrame to this file
system and check whether it works well?

Thanks
Yanbo

On Tue, Jun 27, 2017 at 8:47 PM, John Omernik  wrote:

> Hello all, I am running PySpark 2.1.1 as a user, jomernik. I am working
> through some documentation here:
>
> https://spark.apache.org/docs/latest/mllib-ensembles.html#random-forests
>
> And was working on the Random Forest Classification, and found it to be
> working!  That said, when I try to save the model to my hdfs (MaprFS in my
> case)  I got a weird error:
>
> I tried to save here:
>
> model.save(sc, "maprfs:///user/jomernik/tmp/myRandomForestClassificationMo
> del")
>
> /user/jomernik is my user directory and I have full access to the
> directory.
>
>
>
> All the directories down to
>
> /user/jomernik/tmp/myRandomForestClassificationModel/metadata/_temporary/0
> are owned by my with full permissions, but when I get to this directory,
> here is the ls
>
> $ ls -ls
>
> total 1
>
> 1 drwxr-xr-x 2 root root 1 Jun 27 07:38 task_20170627123834_0019_m_00
>
> 0 drwxr-xr-x 2 root root 0 Jun 27 07:38 _temporary
>
> Am I doing something wrong here? Why is the temp stuff owned by root? Is
> there a bug in saving things due to this ownership?
>
> John
>
>
>
>
>
>
> Exception:
> Py4JJavaError: An error occurred while calling o338.save.
> : org.apache.hadoop.security.AccessControlException: User jomernik(user
> id 101) does has been denied access to rename  /user/jomernik/tmp/
> myRandomForestClassificationModel/metadata/_temporary/0/
> task_20170627123834_0019_m_00/part-0 to /user/jomernik/tmp/
> myRandomForestClassificationModel/metadata/part-0
> at com.mapr.fs.MapRFileSystem.rename(MapRFileSystem.java:1112)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
> FileOutputCommitter.java:461)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(
> FileOutputCommitter.java:475)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.
> commitJobInternal(FileOutputCommitter.java:392)
> at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(
> FileOutputCommitter.java:364)
> at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(
> FileOutputCommitter.java:136)
> at org.apache.spark.SparkHadoopWriter.commitJob(
> SparkHadoopWriter.scala:111)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1227)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1168)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(
> PairRDDFunctions.scala:1168)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1071)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1037)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(
> PairRDDFunctions.scala:1037)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:963)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
> at org.apache.spark.rdd.PairRDDFunctions$$anonfun$
> saveAsHadoopFile$1.apply(PairRDDFunctions.scala:963)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(
> PairRDDFunctions.scala:962)
> at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.
> apply$mcV$sp(RDD.scala:1489)
> at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.
> apply(RDD.scala:1468)
> at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.
> apply(RDD.scala:1468)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at 

How to propagate Non-Empty Value in SPARQL Dataset

2017-06-28 Thread carloallocca
Dear All, 

I am trying to propagate the last valid observation (e.g. not null) to the
null values in a dataset. 

Below I reported the partial solution:

Dataset tmp800=tmp700.select("uuid", "eventTime",
"Washer_rinseCycles");
WindowSpec wspec=
Window.partitionBy(tmp800.col("uuid")).orderBy(tmp800.col("uuid"),tmp800.col("eventTime"));
Column c1 =
org.apache.spark.sql.functions.lag(tmp800.col("Washer_rinseCycles"),1).over(wspec);
Dataset
tmp900=tmp800.withColumn("Washer_rinseCyclesFilled",
when(tmp800.col("Washer_rinseCycles").isNull(),
c1).otherwise(tmp800.col("Washer_rinseCycles")));
However, It does not solve the entire problem as the function lag(,1)
returns the value that is the rows before the current row even if it is NULL
(see the below table).


Is there in SPARK a similar method to Pandas’ “backfill” for the DataFrame?

Is it possible to do it using SPARK API? How?

Many Thanks in advance. 
Best Regards,


 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-propagate-Non-Empty-Value-in-SPARQL-Dataset-tp28803.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to propagate Non-Empty Value in SPARQL Dataset

2017-06-28 Thread carloallocca
Dear All, 

I am trying to propagate the last valid observation (e.g. not null) to the
null values in a dataset. 

Below I reported the partial solution:

Dataset tmp800=tmp700.select("uuid", "eventTime",
"Washer_rinseCycles");
WindowSpec wspec=
Window.partitionBy(tmp800.col("uuid")).orderBy(tmp800.col("uuid"),tmp800.col("eventTime"));
Column c1 =
org.apache.spark.sql.functions.lag(tmp800.col("Washer_rinseCycles"),1).over(wspec);
Dataset
tmp900=tmp800.withColumn("Washer_rinseCyclesFilled",
when(tmp800.col("Washer_rinseCycles").isNull(),
c1).otherwise(tmp800.col("Washer_rinseCycles")));
However, It does not solve the entire problem as the function lag(,1)
returns the value that is the rows before the current row even if it is NULL
(see the below table).


Is there in SPARK a similar method to Pandas’ “backfill” for the DataFrame?

Is it possible to do it using SPARK API? How?

Many Thanks in advance. 
Best Regards,


 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-propagate-Non-Empty-Value-in-SPARQL-Dataset-tp28802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to Fill Sparse Data With the Previous Non-Empty Value in SPARQL Dataset

2017-06-28 Thread Carlo Allocca

Dear All,

I am trying to propagate the last valid observation (e.g. not null) to the null 
values in a dataset.

Below I reported the partial solution:

Dataset tmp800=tmp700.select("uuid", "eventTime", "Washer_rinseCycles");
WindowSpec wspec= 
Window.partitionBy(tmp800.col("uuid")).orderBy(tmp800.col("uuid"),tmp800.col("eventTime"));
Column c1 = 
org.apache.spark.sql.functions.lag(tmp800.col("Washer_rinseCycles"),1).over(wspec);
Dataset tmp900=tmp800.withColumn("Washer_rinseCyclesFilled", 
when(tmp800.col("Washer_rinseCycles").isNull(), 
c1).otherwise(tmp800.col("Washer_rinseCycles")));
However, It does not solve the entire problem as the function lag(,1) returns 
the value that is the rows before the current row even if it is NULL (see the 
below table).

Is there in SPARK a similar method to Pandas' "backfill" for the DataFrame?

Is it possible to do it using SPARK API? How?

Many Thanks in advance.
Best Regards,
Carlo

[Immagine in linea con il testo]


Re: [PySpark]: How to store NumPy array into single DataFrame cell efficiently

2017-06-28 Thread Judit Planas

  
  
Dear Nick,

Thanks for your quick reply.

I quickly implemented your proposal, but I do not see any
improvement. In fact, the test data set of around 3 GB occupies a
total of 10 GB in worker memory, and the execution time of queries
is like 4 times slower than the DF implementation with BinaryType NP
arrays.

Maybe I am doing something wrong, or there is something I am not
taking into account?

Thanks!

Judit

On 28/06/17 12:41, Nick Pentreath
  wrote:


  
  You will need to use PySpark vectors to store in a
DataFrame. They can be created from Numpy arrays as follows:



  from pyspark.ml.linalg import
  Vectors
  df =
  spark.createDataFrame([("src1", "pkey1", 1,
  Vectors.dense(np.array([0, 1, 2])))])
  
  
  
  
On Wed, 28 Jun 2017 at 12:23 Judit Planas
  
  wrote:


   Dear all,

I am trying to store a NumPy array (loaded from an HDF5
dataset) into one cell of a DataFrame, but I am having
problems.

In short, my data layout is similar to a database, where
I have a few columns with metadata (source of
information, primary key, etc.) and the last column
contains a NumPy array (each array will have hundreds to
thousands of elements):
++---+-+---+
| src| PrimaryKey| SecondaryKey|  
Data|
++---+-+---+
|  "src1"|    "pkey1"|    1| np.array([0., 1.,
2.])|
|  "src2"|    "pkey1"|    2| np.array([0., 1.,
2.])|
++---+-+---+

In my case, it is important to keep the NumPy array as
it is (no transformation into Python list, etc.) because
later on I will compute some statistics on the array,
like the average of values. In addition, I expect to
have thousands of rows (NumPy arrays), so I think trying
to explode each array will generate too much duplicated
metadata.

I have successfully been able to load the data that I
want into an RDD using the NumPy array object as it is.
But I would like to use the DataFrame structure to
leverage from the SQL functions.

What I have been able to achieve so far is to store the
raw data of the NP array doing the following:
1. Get the raw data of the NP array by calling
"tobytes()" [https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html]
2. Use "BinaryType" in the DF schema for the NP array
3. Call "np.frombuffer()" whenever I want to get the NP
array back [https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html]

However, I feel this process is not optimal and it
consumes a lot of worker memory. For example, if my data
size is around 3 GB:
- Loading all data into a DF and calling "cache()"
method (within the same line) produces around 3 GB of
memory consumed on the worker nodes.
- However, loading all data into an RDD and calling
"cache()" method (within the same line) produces around
500 MB of consumed on the worker nodes.

From this, I understand that my data is highly
compressible, so using an RDD is more memory-efficient
than the DF ('spark.rdd.compress' is set to 'True' by
default).

In addition, what I see when I run queries on the data
is that, in general, the RDD computes the query in less
time than the DF. My hypothesis here is the following:
since data must be exchanged between worker nodes in
order to perform the queries, the RDD takes less time
because data is compressed, so communication between
workers takes less time.

To summarize, I would like to use the DF structure due
to its advantages (optimized scheduler, SQL support,
etc.), but what I see from real performance measurements
is that RDDs are much more efficient in my 

RE: IDE for python

2017-06-28 Thread Sotola, Radim
I know. But I pay around 20Euro per month for all products from JetBrains and I 
think this is not so much – I Czech it is one evening in pub.

From: Md. Rezaul Karim [mailto:rezaul.ka...@insight-centre.org]
Sent: Wednesday, June 28, 2017 12:55 PM
To: Sotola, Radim 
Cc: spark users ; ayan guha ; 
Abhinay Mehta ; Xiaomeng Wan 
Subject: RE: IDE for python

By the way, Pycharm from JetBrians also have a community edition which is free 
and open source.

Moreover, if you are a student, you can use the professional edition for 
students as well.

For more, see here https://www.jetbrains.com/student/

On Jun 28, 2017 11:18 AM, "Sotola, Radim" 
> wrote:
Pycharm is good choice. I buy monthly subscription and can see that the PyCharm 
development continue  (I mean that this is not tool which somebody develop and 
leave it without any upgrades).

From: Abhinay Mehta 
[mailto:abhinay.me...@gmail.com]
Sent: Wednesday, June 28, 2017 11:06 AM
To: ayan guha >
Cc: User >; Xiaomeng Wan 
>
Subject: Re: IDE for python

I use Pycharm and it works a treat. The big advantage I find is that I can use 
the same command shortcuts that I do when developing with IntelliJ IDEA when 
doing Scala or Java.


On 27 June 2017 at 23:29, ayan guha 
> wrote:
Depends on the need. For data exploration, i use notebooks whenever I can. For 
developement, any good text editor should work, I use sublime. If you want auto 
completion and all, you can use eclipse or pycharm, I do not :)

On Wed, 28 Jun 2017 at 7:17 am, Xiaomeng Wan 
> wrote:
Hi,
I recently switched from scala to python, and wondered which IDE people are 
using for python. I heard about pycharm, spyder etc. How do they compare with 
each other?

Thanks,
Shawn
--
Best Regards,
Ayan Guha



Re: [ML] Stop conditions for RandomForest

2017-06-28 Thread OBones

To me, they are.
Y is used to control if a split is a valid candidate when deciding which 
one to follow.
X is used to make a node a leaf if it has too few elements to even 
consider candidate splits.


颜发才(Yan Facai) wrote:
It seems that split will always stop when count of nodes is less than 
max(X, Y).

Hence, are they different?



On Tue, Jun 27, 2017 at 11:07 PM, OBones > wrote:


Hello,

Reading around on the theory behind tree based regression, I
concluded that there are various reasons to stop exploring the
tree when a given node has been reached. Among these, I have those
two:

1. When starting to process a node, if its size (row count) is
less than X then consider it a leaf
2. When a split for a node is considered, if any side of the split
has its size less than Y, then ignore it when selecting the best split

As an example, let's consider a node with 45 rows, that for a
given split creates two children, containing 5 and 35 rows
respectively.

If I set X to 50, then the node is a leaf and no split is attempted
if I set X to 10 and Y to 15, then the splits are computed but
because one of them has less than 15 rows, that split is ignored.

I'm using DecisionTreeRegressor and RandomForestRegressor on our
data and because the former is implemented using the latter, they
both share the same parameters.
Going through those parameters, I found minInstancesPerNode which
to me is the Y value, but I could not find any parameter for the X
value.
Have I missed something?
If not, would there be a way to implement this?

Regards



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org






-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



(Spark-ml) java.util.NosuchElementException: key not found exception on doing prediction and computing test error.

2017-06-28 Thread neha nihal
Thanks. Its working now. My test data had some labels which were not there
in training set.

On Wednesday, June 28, 2017, Pralabh Kumar > wrote:

> Hi Neha
>
> This generally occurred when , you training data set have some value of
> categorical variable ,which in not there in your testing data. For e.g you
> have column DAYS ,with value M,T,W in training data . But when your test
> data contains F ,then it say no key found exception .  Please look into
> this  , and if that's not the case ,then Could you please share your code
> ,and training/testing data for better understanding.
>
> Regards
> Pralabh Kumar
>
> On Wed, Jun 28, 2017 at 11:45 AM, neha nihal 
> wrote:
>
>>
>> Hi,
>>
>> I am using Apache spark 2.0.2 randomforest ml (standalone mode) for text
>> classification. TF-IDF feature extractor is also used. The training part
>> runs without any issues and returns 100% accuracy. But when I am trying to
>> do prediction using trained model and compute test error, it fails with
>> java.util.NosuchElementException: key not found exception.
>> Any help will be much appreciated.
>>
>> Thanks
>>
>>
>


RE: IDE for python

2017-06-28 Thread Md. Rezaul Karim
By the way, Pycharm from JetBrians also have a community edition which is
free and open source.

Moreover, if you are a student, you can use the professional edition for
students as well.

For more, see here https://www.jetbrains.com/student/

On Jun 28, 2017 11:18 AM, "Sotola, Radim"  wrote:

> Pycharm is good choice. I buy monthly subscription and can see that the
> PyCharm development continue  (I mean that this is not tool which somebody
> develop and leave it without any upgrades).
>
>
>
> *From:* Abhinay Mehta [mailto:abhinay.me...@gmail.com]
> *Sent:* Wednesday, June 28, 2017 11:06 AM
> *To:* ayan guha 
> *Cc:* User ; Xiaomeng Wan 
> *Subject:* Re: IDE for python
>
>
>
> I use Pycharm and it works a treat. The big advantage I find is that I can
> use the same command shortcuts that I do when developing with IntelliJ IDEA
> when doing Scala or Java.
>
>
>
>
>
> On 27 June 2017 at 23:29, ayan guha  wrote:
>
> Depends on the need. For data exploration, i use notebooks whenever I can.
> For developement, any good text editor should work, I use sublime. If you
> want auto completion and all, you can use eclipse or pycharm, I do not :)
>
>
>
> On Wed, 28 Jun 2017 at 7:17 am, Xiaomeng Wan  wrote:
>
> Hi,
>
> I recently switched from scala to python, and wondered which IDE people
> are using for python. I heard about pycharm, spyder etc. How do they
> compare with each other?
>
>
>
> Thanks,
>
> Shawn
>
> --
>
> Best Regards,
> Ayan Guha
>
>
>


Re: [PySpark]: How to store NumPy array into single DataFrame cell efficiently

2017-06-28 Thread Nick Pentreath
You will need to use PySpark vectors to store in a DataFrame. They can be
created from Numpy arrays as follows:

from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([("src1", "pkey1", 1, Vectors.dense(np.array([0,
1, 2])))])


On Wed, 28 Jun 2017 at 12:23 Judit Planas  wrote:

> Dear all,
>
> I am trying to store a NumPy array (loaded from an HDF5 dataset) into one
> cell of a DataFrame, but I am having problems.
>
> In short, my data layout is similar to a database, where I have a few
> columns with metadata (source of information, primary key, etc.) and the
> last column contains a NumPy array (each array will have hundreds to
> thousands of elements):
> ++---+-+---+
> | src| PrimaryKey| SecondaryKey|   Data|
> ++---+-+---+
> |  "src1"|"pkey1"|1| np.array([0., 1., 2.])|
> |  "src2"|"pkey1"|2| np.array([0., 1., 2.])|
> ++---+-+---+
>
> In my case, it is important to keep the NumPy array as it is (no
> transformation into Python list, etc.) because later on I will compute some
> statistics on the array, like the average of values. In addition, I expect
> to have thousands of rows (NumPy arrays), so I think trying to explode each
> array will generate too much duplicated metadata.
>
> I have successfully been able to load the data that I want into an RDD
> using the NumPy array object as it is. But I would like to use the
> DataFrame structure to leverage from the SQL functions.
>
> What I have been able to achieve so far is to store the raw data of the NP
> array doing the following:
> 1. Get the raw data of the NP array by calling "tobytes()" [
> https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html
> ]
> 2. Use "BinaryType" in the DF schema for the NP array
> 3. Call "np.frombuffer()" whenever I want to get the NP array back [
> https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html
> ]
>
> However, I feel this process is not optimal and it consumes a lot of
> worker memory. For example, if my data size is around 3 GB:
> - Loading all data into a DF and calling "cache()" method (within the same
> line) produces around 3 GB of memory consumed on the worker nodes.
> - However, loading all data into an RDD and calling "cache()" method
> (within the same line) produces around 500 MB of consumed on the worker
> nodes.
>
> From this, I understand that my data is highly compressible, so using an
> RDD is more memory-efficient than the DF ('spark.rdd.compress' is set to
> 'True' by default).
>
> In addition, what I see when I run queries on the data is that, in
> general, the RDD computes the query in less time than the DF. My hypothesis
> here is the following: since data must be exchanged between worker nodes in
> order to perform the queries, the RDD takes less time because data is
> compressed, so communication between workers takes less time.
>
> To summarize, I would like to use the DF structure due to its advantages
> (optimized scheduler, SQL support, etc.), but what I see from real
> performance measurements is that RDDs are much more efficient in my case
> (both execution time and memory consumption). So, I wonder if there is a
> better way to store NP arrays into a DF so that I can benefit from their
> advantages but at the same time they show the same good performance as RDDs.
>
> Regarding the environment, my Spark version is 2.0.1 with Python 3.5.2,
> but I am not restricted to use these versions. I am not tuning any special
> variable (using default values).
>
> Thanks in advance, and please, let me know if I forgot to mention any
> detail or you need further information.
>
> Kind regards,
> Judit
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


[PySpark]: How to store NumPy array into single DataFrame cell efficiently

2017-06-28 Thread Judit Planas

  
  
Dear all,

I am trying to store a NumPy array (loaded from an HDF5 dataset)
into one cell of a DataFrame, but I am having problems.

In short, my data layout is similar to a database, where I have a
few columns with metadata (source of information, primary key, etc.)
and the last column contains a NumPy array (each array will have
hundreds to thousands of elements):
++---+-+---+
| src| PrimaryKey| SecondaryKey|   Data|
++---+-+---+
|  "src1"|    "pkey1"|    1| np.array([0., 1., 2.])|
|  "src2"|    "pkey1"|    2| np.array([0., 1., 2.])|
++---+-+---+

In my case, it is important to keep the NumPy array as it is (no
transformation into Python list, etc.) because later on I will
compute some statistics on the array, like the average of values. In
addition, I expect to have thousands of rows (NumPy arrays), so I
think trying to explode each array will generate too much duplicated
metadata.

I have successfully been able to load the data that I want into an
RDD using the NumPy array object as it is. But I would like to use
the DataFrame structure to leverage from the SQL functions.

What I have been able to achieve so far is to store the raw data of
the NP array doing the following:
1. Get the raw data of the NP array by calling "tobytes()"
[https://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.tobytes.html]
2. Use "BinaryType" in the DF schema for the NP array
3. Call "np.frombuffer()" whenever I want to get the NP array back
[https://docs.scipy.org/doc/numpy/reference/generated/numpy.frombuffer.html]

However, I feel this process is not optimal and it consumes a lot of
worker memory. For example, if my data size is around 3 GB:
- Loading all data into a DF and calling "cache()" method (within
the same line) produces around 3 GB of memory consumed on the worker
nodes.
- However, loading all data into an RDD and calling "cache()" method
(within the same line) produces around 500 MB of consumed on the
worker nodes.

From this, I understand that my data is highly compressible, so
using an RDD is more memory-efficient than the DF
('spark.rdd.compress' is set to 'True' by default).

In addition, what I see when I run queries on the data is that, in
general, the RDD computes the query in less time than the DF. My
hypothesis here is the following: since data must be exchanged
between worker nodes in order to perform the queries, the RDD takes
less time because data is compressed, so communication between
workers takes less time.

To summarize, I would like to use the DF structure due to its
advantages (optimized scheduler, SQL support, etc.), but what I see
from real performance measurements is that RDDs are much more
efficient in my case (both execution time and memory consumption).
So, I wonder if there is a better way to store NP arrays into a DF
so that I can benefit from their advantages but at the same time
they show the same good performance as RDDs.

Regarding the environment, my Spark version is 2.0.1 with Python
3.5.2, but I am not restricted to use these versions. I am not
tuning any special variable (using default values).

Thanks in advance, and please, let me know if I forgot to mention
any detail or you need further information.

Kind regards,
Judit
  


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: IDE for python

2017-06-28 Thread Sotola, Radim
Pycharm is good choice. I buy monthly subscription and can see that the PyCharm 
development continue  (I mean that this is not tool which somebody develop and 
leave it without any upgrades).

From: Abhinay Mehta [mailto:abhinay.me...@gmail.com]
Sent: Wednesday, June 28, 2017 11:06 AM
To: ayan guha 
Cc: User ; Xiaomeng Wan 
Subject: Re: IDE for python

I use Pycharm and it works a treat. The big advantage I find is that I can use 
the same command shortcuts that I do when developing with IntelliJ IDEA when 
doing Scala or Java.


On 27 June 2017 at 23:29, ayan guha 
> wrote:
Depends on the need. For data exploration, i use notebooks whenever I can. For 
developement, any good text editor should work, I use sublime. If you want auto 
completion and all, you can use eclipse or pycharm, I do not :)

On Wed, 28 Jun 2017 at 7:17 am, Xiaomeng Wan 
> wrote:
Hi,
I recently switched from scala to python, and wondered which IDE people are 
using for python. I heard about pycharm, spyder etc. How do they compare with 
each other?

Thanks,
Shawn
--
Best Regards,
Ayan Guha



Re: HDP 2.5 - Python - Spark-On-Hbase

2017-06-28 Thread ayan guha
Hi

Thanks for all of you, I could get HBase connector working. there are still
some details around namespace is pending, but overall it is working well.

Now, as usual, I would like to use the same concept into Structured
Streaming. Is there any similar way I can use writeStream.format and use
HBase writer? Or any other way to write continuous data to HBase?

best
Ayan

On Tue, Jun 27, 2017 at 2:15 AM, Weiqing Yang 
wrote:

> For SHC documentation, please refer the README in SHC github, which is
> kept up-to-date.
>
> On Mon, Jun 26, 2017 at 5:46 AM, ayan guha  wrote:
>
>> Thanks all, I have found correct version of the package. Probably HDP
>> documentation is little behind.
>>
>> Best
>> Ayan
>>
>> On Mon, 26 Jun 2017 at 2:16 pm, Mahesh Sawaiker <
>> mahesh_sawai...@persistent.com> wrote:
>>
>>> Ayan,
>>>
>>> The location of the logging class was moved from Spark 1.6 to Spark 2.0.
>>>
>>> Looks like you are trying to run 1.6 code on 2.0, I have ported some
>>> code like this before and if you have access to the code you can recompile
>>> it by changing reference to Logging class and directly use the slf4 Logger
>>> class, most of the code tends to be easily portable.
>>>
>>>
>>>
>>> Following is the release note for Spark 2.0
>>>
>>>
>>>
>>> *Removals, Behavior Changes and Deprecations*
>>>
>>> *Removals*
>>>
>>> The following features have been removed in Spark 2.0:
>>>
>>>- Bagel
>>>- Support for Hadoop 2.1 and earlier
>>>- The ability to configure closure serializer
>>>- HTTPBroadcast
>>>- TTL-based metadata cleaning
>>>- *Semi-private class org.apache.spark.Logging. We suggest you use
>>>slf4j directly.*
>>>- SparkContext.metricsSystem
>>>
>>> Thanks,
>>>
>>> Mahesh
>>>
>>>
>>>
>>>
>>>
>>> *From:* ayan guha [mailto:guha.a...@gmail.com]
>>> *Sent:* Monday, June 26, 2017 6:26 AM
>>> *To:* Weiqing Yang
>>> *Cc:* user
>>> *Subject:* Re: HDP 2.5 - Python - Spark-On-Hbase
>>>
>>>
>>>
>>> Hi
>>>
>>>
>>>
>>> I am using following:
>>>
>>>
>>>
>>> --packages com.hortonworks:shc:1.0.0-1.6-s_2.10 --repositories
>>> http://repo.hortonworks.com/content/groups/public/
>>>
>>>
>>>
>>> Is it compatible with Spark 2.X? I would like to use it
>>>
>>>
>>>
>>> Best
>>>
>>> Ayan
>>>
>>>
>>>
>>> On Sat, Jun 24, 2017 at 2:09 AM, Weiqing Yang 
>>> wrote:
>>>
>>> Yes.
>>>
>>> What SHC version you were using?
>>>
>>> If hitting any issues, you can post them in SHC github issues. There are
>>> some threads about this.
>>>
>>>
>>>
>>> On Fri, Jun 23, 2017 at 5:46 AM, ayan guha  wrote:
>>>
>>> Hi
>>>
>>>
>>>
>>> Is it possible to use SHC from Hortonworks with pyspark? If so, any
>>> working code sample available?
>>>
>>>
>>>
>>> Also, I faced an issue while running the samples with Spark 2.0
>>>
>>>
>>>
>>> "Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging"
>>>
>>>
>>>
>>> Any workaround?
>>>
>>>
>>>
>>> Thanks in advance
>>>
>>>
>>>
>>> --
>>>
>>> Best Regards,
>>> Ayan Guha
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>> Best Regards,
>>> Ayan Guha
>>> DISCLAIMER
>>> ==
>>> This e-mail may contain privileged and confidential information which is
>>> the property of Persistent Systems Ltd. It is intended only for the use of
>>> the individual or entity to which it is addressed. If you are not the
>>> intended recipient, you are not authorized to read, retain, copy, print,
>>> distribute or use this message. If you have received this communication in
>>> error, please notify the sender and delete all copies of this message.
>>> Persistent Systems Ltd. does not accept any liability for virus infected
>>> mails.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Best Regards,
Ayan Guha


Re: [ML] Stop conditions for RandomForest

2017-06-28 Thread Yan Facai
It seems that split will always stop when count of nodes is less than
max(X, Y).
Hence, are they different?



On Tue, Jun 27, 2017 at 11:07 PM, OBones  wrote:

> Hello,
>
> Reading around on the theory behind tree based regression, I concluded
> that there are various reasons to stop exploring the tree when a given node
> has been reached. Among these, I have those two:
>
> 1. When starting to process a node, if its size (row count) is less than X
> then consider it a leaf
> 2. When a split for a node is considered, if any side of the split has its
> size less than Y, then ignore it when selecting the best split
>
> As an example, let's consider a node with 45 rows, that for a given split
> creates two children, containing 5 and 35 rows respectively.
>
> If I set X to 50, then the node is a leaf and no split is attempted
> if I set X to 10 and Y to 15, then the splits are computed but because one
> of them has less than 15 rows, that split is ignored.
>
> I'm using DecisionTreeRegressor and RandomForestRegressor on our data and
> because the former is implemented using the latter, they both share the
> same parameters.
> Going through those parameters, I found minInstancesPerNode which to me is
> the Y value, but I could not find any parameter for the X value.
> Have I missed something?
> If not, would there be a way to implement this?
>
> Regards
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: IDE for python

2017-06-28 Thread Abhinay Mehta
I use Pycharm and it works a treat. The big advantage I find is that I can
use the same command shortcuts that I do when developing with IntelliJ IDEA
when doing Scala or Java.


On 27 June 2017 at 23:29, ayan guha  wrote:

> Depends on the need. For data exploration, i use notebooks whenever I can.
> For developement, any good text editor should work, I use sublime. If you
> want auto completion and all, you can use eclipse or pycharm, I do not :)
>
> On Wed, 28 Jun 2017 at 7:17 am, Xiaomeng Wan  wrote:
>
>> Hi,
>> I recently switched from scala to python, and wondered which IDE people
>> are using for python. I heard about pycharm, spyder etc. How do they
>> compare with each other?
>>
>> Thanks,
>> Shawn
>>
> --
> Best Regards,
> Ayan Guha
>


Re: How do I find the time taken by each step in a stage in a Spark Job

2017-06-28 Thread ??????????
You can find the information from the spark UI 


 
---Original---
From: "SRK"
Date: 2017/6/28 02:36:37
To: "user";
Subject: How do I find the time taken by each step in a stage in a Spark Job


Hi,

How do I find the time taken by each step in a stage in spark job? Also, how
do I find the bottleneck in each step and if a stage is skipped because of
the RDDs being persisted in streaming?

I am trying to identify which step in a job is taking time in my Streaming
job.

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-do-I-find-the-time-taken-by-each-step-in-a-stage-in-a-Spark-Job-tp28796.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: (Spark-ml) java.util.NosuchElementException: key not found exception on doing prediction and computing test error.

2017-06-28 Thread Pralabh Kumar
Hi Neha

This generally occurred when , you training data set have some value of
categorical variable ,which in not there in your testing data. For e.g you
have column DAYS ,with value M,T,W in training data . But when your test
data contains F ,then it say no key found exception .  Please look into
this  , and if that's not the case ,then Could you please share your code
,and training/testing data for better understanding.

Regards
Pralabh Kumar

On Wed, Jun 28, 2017 at 11:45 AM, neha nihal  wrote:

>
> Hi,
>
> I am using Apache spark 2.0.2 randomforest ml (standalone mode) for text
> classification. TF-IDF feature extractor is also used. The training part
> runs without any issues and returns 100% accuracy. But when I am trying to
> do prediction using trained model and compute test error, it fails with
> java.util.NosuchElementException: key not found exception.
> Any help will be much appreciated.
>
> Thanks
>
>


Fwd: (Spark-ml) java.util.NosuchElementException: key not found exception on doing prediction and computing test error.

2017-06-28 Thread neha nihal
Hi,

I am using Apache spark 2.0.2 randomforest ml (standalone mode) for text
classification. TF-IDF feature extractor is also used. The training part
runs without any issues and returns 100% accuracy. But when I am trying to
do prediction using trained model and compute test error, it fails with
java.util.NosuchElementException: key not found exception.
Any help will be much appreciated.

Thanks