FetchFailed exception with Spark 1.6

2016-09-29 Thread Ankur Srivastava
Hi,

I am running a simple job on Spark 1.6 in which I am trying to leftOuterJoin a
big RDD with a smaller one. I am not yet broadcasting the smaller RDD yet
but I am stilling running into FetchFailed errors with finally the job
getting killed.

I have already partitioned the data to 5000 partitions and every time the
job runs with no errors for the first 2K to 3K tasks but then starts
getting this exception.

If I look further in the stack trace for some I see errors like below but
if there is any network issue the initial 2k+ tasks should not have
succeeded.

Caused by: java.io.IOException: Connection reset by peer


Caused by: java.io.IOException: Failed to connect to 

I am running on Yarn cluster manager with 200 executors and 6GB of executor
and driver heap. I had in the last run seen errors related to
spark.yarn.executor.memoryOverhead, so I have set it to 1.5 GB and do not
see those errors.


Any help will be much appreciated.

Thanks
Ankur


Issues in compiling spark 2.0.0 code using scala-maven-plugin

2016-09-29 Thread satyajit vegesna
Hi ALL,

i am trying to compile code using maven ,which was working with spark
1.6.2, but when i try for spark 2.0.0 then i get below error,

org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (default) on
project NginxLoads-repartition: wrap:
org.apache.commons.exec.ExecuteException: Process exited with an error: 1
(Exit value: 1)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
at
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863)
at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288)
at org.apache.maven.cli.MavenCli.main(MavenCli.java:199)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
at
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
at
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.maven.plugin.MojoExecutionException: wrap:
org.apache.commons.exec.ExecuteException: Process exited with an error: 1
(Exit value: 1)
at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:490)
at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207)
... 20 more
Caused by: org.apache.commons.exec.ExecuteException: Process exited with an
error: 1 (Exit value: 1)
at
org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:377)
at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:160)
at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:147)
at
scala_maven_executions.JavaMainCallerByFork.run(JavaMainCallerByFork.java:100)
at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:161)
at scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
... 22 more


PFB pom.xml that i am using, any help would be appreciated.


http://maven.apache.org/POM/4.0.0;
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
4.0.0

NginxLoads-repartition
NginxLoads-repartition
1.1-SNAPSHOT
${project.artifactId}
This is a boilerplate maven project to start using
Spark in Scala
2010


1.6
1.6
UTF-8
2.11
2.11

2.11.8





cloudera-repo-releases
https://repository.cloudera.com/artifactory/repo/




src/main/scala
src/test/scala



maven-assembly-plugin


package

single





jar-with-dependencies




org.apache.maven.plugins
maven-compiler-plugin
3.5.1

1.7
1.7




net.alchim31.maven
scala-maven-plugin
3.2.2







compile
testCompile



-make:transitive

Re: Is there a way to get the AUC metric for CrossValidator?

2016-09-29 Thread Rich Tarro
According to the documentation, cvModel.avgMetrics gives average
cross-validation metrics for each paramMap in
CrossValidator.estimatorParamMaps,
in the corresponding order.

So when using areaUnderROC as the evaluator, cvModel.avgMetrics gives (in
this example using scala, but API appears to work the same in pyspark)

Array(0.8706074097889074, 0.9409529716549123, 0.9618787730606256,
0.8838019837612303, 0.9397610587835981, 0.9591275359721634,
0.8829088978012987, 0.9394137261180164, 0.9584085992609841,
0.8706074097889079, 0.9628051240960216, 0.9827490959747656,
0.8838019837612294, 0.9636100965080932, 0.9826906885021736,
0.8829088978013016, 0.9627072956991051, 0.9809166441709806,
0.8508340706851226, 0.7325352788119097, 0.7208072472539231,
0.8553496724213554, 0.7354481892254211, 0.7251511314439787,
0.8546551939595262, 0.7358349987841173, 0.7251408416244391)

My understanding is that each value is the average areaUnderROC across the
folds for each Parameter Grid combination used during Cross Validation.

I have yet to figure out what "in the corresponding order" specifically
means, so don't know which areaUnderROC value corresponds to which set of
hyperparameters.

Hopefully this is what you are looking for.


On Thu, Sep 29, 2016 at 3:18 PM, evanzamir  wrote:

> I'm using CrossValidator (in PySpark) to create a logistic regression
> model.
> There is "areaUnderROC", which I assume gives the AUC for the bestModel
> chosen by CV. But how to get the areaUnderROC for the test data during the
> cross-validation?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Is-there-a-way-to-get-the-AUC-
> metric-for-CrossValidator-tp27816.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How to extract bestModel parameters from a CrossValidatorModel

2016-09-29 Thread Rich Tarro
I'm able to successfully extract parameters from a PipelineModel using
model.stages. However, when I try to extract parameters from the bestModel
of a CrossValidatorModel using cvModel.bestModel.stages, I get this error.

error: value stages is not a member of org.apache.spark.ml.Model[_$4]

cvModel.bestModel does properly output a PipelineModel.

What am I doing wrong?


Re: spark sql on json

2016-09-29 Thread Hyukjin Kwon
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java#L104-L181

2016-09-29 18:58 GMT+09:00 Hitesh Goyal :

> Hi team,
>
>
>
> I have a json document. I want to put spark SQL to it.
>
> Can you please send me an example app built in JAVA so that I would be
> able to put spark sql queries on my data.
>
>
>
> Regards,
>
> *Hitesh Goyal*
>
> Simpli5d Technologies
>
> Cont No.: 9996588220
>
>
>


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-09-29 Thread Takeshi Yamamuro
Hi,

FYI: Seems 
`sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)`
is only available at hadoop-2.7.3+.

// maropu


On Thu, Sep 29, 2016 at 9:28 PM, joffe.tal  wrote:

> You can use partition explicitly by adding "/="
> to
> the end of the path you are writing to and then use overwrite.
>
> BTW in Spark 2.0 you just need to use:
>
> sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.
> algorithm.version","2”)
> and use s3a://
>
> and you can work with regular output committer (actually
> DirectParquetOutputCommitter is no longer available in Spark 2.0)
>
> so if you are planning on upgrading this could be another motivation
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-
> PartitionBy-SaveMode-Append-tp26398p27810.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Spark 2.0 issue

2016-09-29 Thread Xiao Li
Hi, Ashish,

Will take a look at this soon.

Thanks for reporting this,

Xiao

2016-09-29 14:26 GMT-07:00 Ashish Shrowty :
> If I try to inner-join two dataframes which originated from the same initial
> dataframe that was loaded using spark.sql() call, it results in an error -
>
> // reading from Hive .. the data is stored in Parquet format in Amazon
> S3
> val d1 = spark.sql("select * from ")
> val df1 =
> d1.groupBy("key1","key2").agg(avg("totalprice").as("avgtotalprice"))
> val df2 = d1.groupBy("key1","key2").agg(avg("itemcount").as("avgqty"))
> df1.join(df2, Seq("key1","key2")) gives error -
>  org.apache.spark.sql.AnalysisException: using columns ['key1,'key2] can
> not be resolved given input columns: [key1, key2, avgtotalprice, avgqty];
>
> If the same Dataframe is initialized via spark.read.parquet(), the above
> code works. This same code above also worked with Spark 1.6.2. I created a
> JIRA too ..  SPARK-17709 
>
> Any help appreciated!
>
> Thanks,
> Ashish
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-issue-tp27818.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Running Spark master/slave instances in non Daemon mode

2016-09-29 Thread jpuro
Hi,

I recently tried deploying Spark master and slave instances to container
based environments such as Docker, Nomad etc. There are two issues that I've
found with how the startup scripts work. The sbin/start-master.sh and
sbin/start-slave.sh start a daemon by default, but this isn't as compatible
with container deployments as one would think. The first issue is that the
daemon runs in the background and some container solutions require the apps
to run in the foreground or they consider the application to not be running
and they may close down the task. The second issue is that logs don't seem
to get integrated with the logging mechanism in the container solution. What
is the possibility of adding additional flags or startup scripts for
supporting Spark to run in the foreground? It would be great if a flag like
SPARK_NO_DAEMONIZE could be added or another script for foreground
execution.

Regards,

Jeff



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-master-slave-instances-in-non-Daemon-mode-tp27819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Running Spark master/slave instances in non Daemon mode

2016-09-29 Thread Jeff Puro
Hi,

I recently tried deploying Spark master and slave instances to container
based environments such as Docker, Nomad etc. There are two issues that
I've found with how the startup scripts work. The sbin/start-master.sh and
sbin/start-slave.sh start a daemon by default, but this isn't as compatible
with container deployments as one would think. The first issue is that the
daemon runs in the background and some container solutions require the apps
to run in the foreground or they consider the application to not be running
and they may close down the task. The second issue is that logs don't seem
to get integrated with the logging mechanism in the container solution.
What is the possibility of adding additional flags or startup scripts for
supporting Spark to run in the foreground? It would be great if a flag
like SPARK_NO_DAEMONIZE could be added or another script for foreground
execution.

Regards,

Jeff


Spark 2.0 issue

2016-09-29 Thread Ashish Shrowty
If I try to inner-join two dataframes which originated from the same initial
dataframe that was loaded using spark.sql() call, it results in an error -

// reading from Hive .. the data is stored in Parquet format in Amazon
S3
val d1 = spark.sql("select * from ") 
val df1 =
d1.groupBy("key1","key2").agg(avg("totalprice").as("avgtotalprice"))
val df2 = d1.groupBy("key1","key2").agg(avg("itemcount").as("avgqty")) 
df1.join(df2, Seq("key1","key2")) gives error -
 org.apache.spark.sql.AnalysisException: using columns ['key1,'key2] can
not be resolved given input columns: [key1, key2, avgtotalprice, avgqty];

If the same Dataframe is initialized via spark.read.parquet(), the above
code works. This same code above also worked with Spark 1.6.2. I created a
JIRA too ..  SPARK-17709   

Any help appreciated!

Thanks,
Ashish



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-issue-tp27818.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: pyspark ML example not working

2016-09-29 Thread William Kupersanin
Was there an answer to this? I get this periodically when a job has died
from an error and I run another job. I have gotten around it by going to
/var/lib/hive/metastore/metastore_db and removing the *.lck files. I am
sure this is the exact wrong thing to do as I imagine those lock files
exist to prevent corruption, but I haven't found another way to get around
the situation.


On Thu, Sep 22, 2016 at 5:59 PM, jypucca  wrote:

>
> I installed Spark 2.0.0, and was trying the ML example (IndexToString) on
> this web
> page:http://spark.apache.org/docs/latest/ml-features.html#onehotencoder,
> using jupyter notebook (running Pyspark) to create a simple dataframe, and
> I
> keep getting a long error message (see below). Pyspark has worked fine with
> RDD, but anytime I try to do anything with DataFrame it keep throwing out
> error messages. Any help would be appreciated, thanks!
>
> ***
> Py4JJavaError: An error occurred while calling o23.applySchemaToPythonRDD.
> : java.lang.RuntimeException: java.lang.RuntimeException: Unable to
> instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
> at
> org.apache.hadoop.hive.ql.session.SessionState.start(
> SessionState.java:522)
> at
> org.apache.spark.sql.hive.client.HiveClientImpl.(
> HiveClientImpl.scala:171)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(
> NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
> DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(
> IsolatedClientLoader.scala:258)
> at
> org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
> HiveUtils.scala:359)
> at
> org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
> HiveUtils.scala:263)
> at
> org.apache.spark.sql.hive.HiveSharedState.metadataHive$
> lzycompute(HiveSharedState.scala:39)
> at
> org.apache.spark.sql.hive.HiveSharedState.metadataHive(
> HiveSharedState.scala:38)
> at
> org.apache.spark.sql.hive.HiveSharedState.externalCatalog$lzycompute(
> HiveSharedState.scala:46)
> at
> org.apache.spark.sql.hive.HiveSharedState.externalCatalog(
> HiveSharedState.scala:45)
> at
> org.apache.spark.sql.hive.HiveSessionState.catalog$
> lzycompute(HiveSessionState.scala:50)
> at
> org.apache.spark.sql.hive.HiveSessionState.catalog(
> HiveSessionState.scala:48)
> at
> org.apache.spark.sql.hive.HiveSessionState$$anon$1.<
> init>(HiveSessionState.scala:63)
> at
> org.apache.spark.sql.hive.HiveSessionState.analyzer$
> lzycompute(HiveSessionState.scala:63)
> at
> org.apache.spark.sql.hive.HiveSessionState.analyzer(
> HiveSessionState.scala:62)
> at
> org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:49)
> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
> at
> org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(
> SparkSession.scala:666)
> at
> org.apache.spark.sql.SparkSession.applySchemaToPythonRDD(
> SparkSession.scala:656)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> at py4j.reflection.ReflectionEngine.invoke(
> ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
> java:128)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:211)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Unable to instantiate
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
> at
> org.apache.hadoop.hive.metastore.MetaStoreUtils.
> newInstance(MetaStoreUtils.java:1523)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.
> (RetryingMetaStoreClient.java:86)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(
> RetryingMetaStoreClient.java:132)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(
> RetryingMetaStoreClient.java:104)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.
> java:3005)
> at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
> at
> 

Setting conf options in jupyter

2016-09-29 Thread William Kupersanin
Hello,

I am trying to figure out how to correctly set config options in jupyter
when I am already provided a SparkContext and a HiveContext. I need to
increase a couple of memory allocations. My program dies indicating that I
am trying to call methods on a stopped SparkContext. I thought I had
created a new one with the new conf so I am not sure why

My code is as follows:

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
conf = (SparkConf()
.set("spark.yarn.executor.memoryOverhead", "4096")
   .set("spark.kryoserializer.buffer.max.mb", "1024"))

sc.stop()
sc = SparkContext(conf=conf)
sqlContext2 = SQLContext.getOrCreate(sc)
starttime = time.time()
sampledate = "20160913"
networkdf = sqlContext2.read.json("/sp/network/" + sampledate + "/03/*")


An error occurred while calling o144.json.
: java.lang.IllegalStateException: Cannot call methods on a stopped
SparkContext.
This stopped SparkContext was created at:


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Michael Segel
OP mentioned HBase or HDFS as persisted storage. Therefore they have to be 
running YARN if they are considering spark. 
(Assuming that you’re not trying to do a storage / compute model and use 
standalone spark outside your cluster. You can, but you have more moving 
parts…) 

I never said anything about putting something on a public network. I mentioned 
running a secured cluster.
You don’t deal with PII or other regulated data, do you? 


If you read my original post, you are correct we don’t have a lot, if any real 
information. 
Based on what the OP said, there are design considerations since every tool he 
mentioned has pluses and minuses and the problem isn’t really that challenging 
unless you have something extraordinary like high velocity or some other 
constraint that makes this challenging. 

BTW, depending on scale and velocity… your relational engines may become 
problematic. 
HTH

-Mike


> On Sep 29, 2016, at 1:51 PM, Cody Koeninger  wrote:
> 
> The OP didn't say anything about Yarn, and why are you contemplating
> putting Kafka or Spark on public networks to begin with?
> 
> Gwen's right, absent any actual requirements this is kind of pointless.
> 
> On Thu, Sep 29, 2016 at 1:27 PM, Michael Segel
>  wrote:
>> Spark standalone is not Yarn… or secure for that matter… ;-)
>> 
>>> On Sep 29, 2016, at 11:18 AM, Cody Koeninger  wrote:
>>> 
>>> Spark streaming helps with aggregation because
>>> 
>>> A. raw kafka consumers have no built in framework for shuffling
>>> amongst nodes, short of writing into an intermediate topic (I'm not
>>> touching Kafka Streams here, I don't have experience), and
>>> 
>>> B. it deals with batches, so you can transactionally decide to commit
>>> or rollback your aggregate data and your offsets.  Otherwise your
>>> offsets and data store can get out of sync, leading to lost /
>>> duplicate data.
>>> 
>>> Regarding long running spark jobs, I have streaming jobs in the
>>> standalone manager that have been running for 6 months or more.
>>> 
>>> On Thu, Sep 29, 2016 at 11:01 AM, Michael Segel
>>>  wrote:
 Ok… so what’s the tricky part?
 Spark Streaming isn’t real time so if you don’t mind a slight delay in 
 processing… it would work.
 
 The drawback is that you now have a long running Spark Job (assuming under 
 YARN) and that could become a problem in terms of security and resources.
 (How well does Yarn handle long running jobs these days in a secured 
 Cluster? Steve L. may have some insight… )
 
 Raw HDFS would become a problem because Apache HDFS is still a worm. (Do 
 you want to write your own compaction code? Or use Hive 1.x+?)
 
 HBase? Depending on your admin… stability could be a problem.
 Cassandra? That would be a separate cluster and that in itself could be a 
 problem…
 
 YMMV so you need to address the pros/cons of each tool specific to your 
 environment and skill level.
 
 HTH
 
 -Mike
 
> On Sep 29, 2016, at 8:54 AM, Ali Akhtar  wrote:
> 
> I have a somewhat tricky use case, and I'm looking for ideas.
> 
> I have 5-6 Kafka producers, reading various APIs, and writing their raw 
> data into Kafka.
> 
> I need to:
> 
> - Do ETL on the data, and standardize it.
> 
> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS / 
> ElasticSearch / Postgres)
> 
> - Query this data to generate reports / analytics (There will be a web UI 
> which will be the front-end to the data, and will show the reports)
> 
> Java is being used as the backend language for everything (backend of the 
> web UI, as well as the ETL layer)
> 
> I'm considering:
> 
> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer 
> (receive raw data from Kafka, standardize & store it)
> 
> - Using Cassandra, HBase, or raw HDFS, for storing the standardized data, 
> and to allow queries
> 
> - In the backend of the web UI, I could either use Spark to run queries 
> across the data (mostly filters), or directly run queries against 
> Cassandra / HBase
> 
> I'd appreciate some thoughts / suggestions on which of these alternatives 
> I should go with (e.g, using raw Kafka consumers vs Spark for ETL, which 
> persistent data store to use, and how to query that data store in the 
> backend of the web UI, for displaying the reports).
> 
> 
> Thanks.
 
>> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



writing to s3 failing to move parquet files from temporary folder

2016-09-29 Thread jamborta
Hi,

I have an 8 hour job (spark 2.0.0) that writes the results out to parquet
using the standard approach:

processed_images_df.write.format("parquet").save(s3_output_path)

It executes 1 tasks and writes the results to a _temporary folder, and
in the last step (after all the tasks completed) it copies the parquet files
from the _temporary folder, but after copying about 2-3000 files it fails
with the following (first I thought this was a temporary s3 failure, but I
rerun 3 times and getting the same error):

org.apache.spark.SparkException: Job aborted.
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(I
nsertIntoHadoopFsRelationCommand.scala:149)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIn
toHadoopFsRelationCommand.scala:115)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIn
toHadoopFsRelationCommand.scala:115)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelatio
nCommand.scala:115)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
at
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:487)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.http.NoHttpResponseException:
s3-bucket.s3.amazonaws.com:443 failed to respon
d
at
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143)
at
org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
at
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:
283)
at
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:259)
at
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:2
32)
at
org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
at
org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
at
org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:686)
at
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:488)
at
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
at
org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326)
at

Re: Questions about DataFrame's filter()

2016-09-29 Thread Michael Armbrust
-dev +user

It surprises me as `filter()` takes a Column, not a `Row => Boolean`.


There are several overloaded versions of Dataset.filter(...)

def filter(func: FilterFunction[T]): Dataset[T]
def filter(func: (T) ⇒ Boolean): Dataset[T]
def filter(conditionExpr: String): Dataset[T]
def filter(condition: Column): Dataset[T]

... and why the error occurs.  Can someone explain please?


Anytime the compiler fails like that, it is probably a Spark code
generation bug.  It would be awesome if you could try your application on
Spark 2.0.1 (currently voting on RC3) and see if its fixed.  If not, please
open a JIRA.

Michael

On Thu, Sep 29, 2016 at 9:16 AM, Samy Dindane  wrote:

> Hi,
>
> I noticed that the following code compiles:
>
>   val df = spark.read.format("com.databricks.spark.avro").load("/tmp/
> whatever/output")
>   val count = df.filter(x => x.getAs[Int]("day") == 2).count
>
> It surprises me as `filter()` takes a Column, not a `Row => Boolean`.
>
> Also, this code returns the right result, but takes 1m30 to run (while it
> takes less than 1 second when using `$"day" === 2`) and gives the error
> pasted in the bottom of this message.
>
> I was just wondering why it does work (implicit conversion?), why it is
> slow, and why the error occurs.
> Can someone explain please?
>
> Thank you,
>
> Samy
>
> --
>
> [error] org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 398, Column 41: Expression "scan_isNull10" is not an
> rvalue
> [error] at org.codehaus.janino.UnitCompil
> er.compileError(UnitCompiler.java:10174)
> [error] at org.codehaus.janino.UnitCompil
> er.toRvalueOrCompileException(UnitCompiler.java:6036)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue2(UnitCompiler.java:4440)
> [error] at org.codehaus.janino.UnitCompil
> er.access$9900(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$11.visitAmbiguousName(UnitCompiler.java:4417)
> [error] at org.codehaus.janino.Java$Ambig
> uousName.accept(Java.java:3138)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue(UnitCompiler.java:4427)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue2(UnitCompiler.java:4634)
> [error] at org.codehaus.janino.UnitCompil
> er.access$8900(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$11.visitBinaryOperation(UnitCompiler.java:4394)
> [error] at org.codehaus.janino.Java$Binar
> yOperation.accept(Java.java:3768)
> [error] at org.codehaus.janino.UnitCompil
> er.getConstantValue(UnitCompiler.java:4427)
> [error] at org.codehaus.janino.UnitCompil
> er.compileGetValue(UnitCompiler.java:4360)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:1845)
> [error] at org.codehaus.janino.UnitCompil
> er.access$2000(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$4.visitLocalVariableDeclarationStatement(UnitCompiler.java:945)
> [error] at org.codehaus.janino.Java$Local
> VariableDeclarationStatement.accept(Java.java:2508)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:958)
> [error] at org.codehaus.janino.UnitCompil
> er.compileStatements(UnitCompiler.java:1007)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:2293)
> [error] at org.codehaus.janino.UnitCompil
> er.compileDeclaredMethods(UnitCompiler.java:822)
> [error] at org.codehaus.janino.UnitCompil
> er.compileDeclaredMethods(UnitCompiler.java:794)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:507)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:658)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:662)
> [error] at org.codehaus.janino.UnitCompil
> er.access$600(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$2.visitMemberClassDeclaration(UnitCompiler.java:350)
> [error] at org.codehaus.janino.Java$Membe
> rClassDeclaration.accept(Java.java:1035)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:354)
> [error] at org.codehaus.janino.UnitCompil
> er.compileDeclaredMemberTypes(UnitCompiler.java:769)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:532)
> [error] at org.codehaus.janino.UnitCompil
> er.compile2(UnitCompiler.java:393)
> [error] at org.codehaus.janino.UnitCompil
> er.access$400(UnitCompiler.java:185)
> [error] at org.codehaus.janino.UnitCompil
> er$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
> [error] at org.codehaus.janino.Java$Packa
> geMemberClassDeclaration.accept(Java.java:1139)
> [error] at org.codehaus.janino.UnitCompil
> er.compile(UnitCompiler.java:354)
> [error] 

Is there a way to get the AUC metric for CrossValidator?

2016-09-29 Thread evanzamir
I'm using CrossValidator (in PySpark) to create a logistic regression model.
There is "areaUnderROC", which I assume gives the AUC for the bestModel
chosen by CV. But how to get the areaUnderROC for the test data during the
cross-validation? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-way-to-get-the-AUC-metric-for-CrossValidator-tp27816.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Cody Koeninger
The OP didn't say anything about Yarn, and why are you contemplating
putting Kafka or Spark on public networks to begin with?

Gwen's right, absent any actual requirements this is kind of pointless.

On Thu, Sep 29, 2016 at 1:27 PM, Michael Segel
 wrote:
> Spark standalone is not Yarn… or secure for that matter… ;-)
>
>> On Sep 29, 2016, at 11:18 AM, Cody Koeninger  wrote:
>>
>> Spark streaming helps with aggregation because
>>
>> A. raw kafka consumers have no built in framework for shuffling
>> amongst nodes, short of writing into an intermediate topic (I'm not
>> touching Kafka Streams here, I don't have experience), and
>>
>> B. it deals with batches, so you can transactionally decide to commit
>> or rollback your aggregate data and your offsets.  Otherwise your
>> offsets and data store can get out of sync, leading to lost /
>> duplicate data.
>>
>> Regarding long running spark jobs, I have streaming jobs in the
>> standalone manager that have been running for 6 months or more.
>>
>> On Thu, Sep 29, 2016 at 11:01 AM, Michael Segel
>>  wrote:
>>> Ok… so what’s the tricky part?
>>> Spark Streaming isn’t real time so if you don’t mind a slight delay in 
>>> processing… it would work.
>>>
>>> The drawback is that you now have a long running Spark Job (assuming under 
>>> YARN) and that could become a problem in terms of security and resources.
>>> (How well does Yarn handle long running jobs these days in a secured 
>>> Cluster? Steve L. may have some insight… )
>>>
>>> Raw HDFS would become a problem because Apache HDFS is still a worm. (Do 
>>> you want to write your own compaction code? Or use Hive 1.x+?)
>>>
>>> HBase? Depending on your admin… stability could be a problem.
>>> Cassandra? That would be a separate cluster and that in itself could be a 
>>> problem…
>>>
>>> YMMV so you need to address the pros/cons of each tool specific to your 
>>> environment and skill level.
>>>
>>> HTH
>>>
>>> -Mike
>>>
 On Sep 29, 2016, at 8:54 AM, Ali Akhtar  wrote:

 I have a somewhat tricky use case, and I'm looking for ideas.

 I have 5-6 Kafka producers, reading various APIs, and writing their raw 
 data into Kafka.

 I need to:

 - Do ETL on the data, and standardize it.

 - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS / 
 ElasticSearch / Postgres)

 - Query this data to generate reports / analytics (There will be a web UI 
 which will be the front-end to the data, and will show the reports)

 Java is being used as the backend language for everything (backend of the 
 web UI, as well as the ETL layer)

 I'm considering:

 - Using raw Kafka consumers, or Spark Streaming, as the ETL layer (receive 
 raw data from Kafka, standardize & store it)

 - Using Cassandra, HBase, or raw HDFS, for storing the standardized data, 
 and to allow queries

 - In the backend of the web UI, I could either use Spark to run queries 
 across the data (mostly filters), or directly run queries against 
 Cassandra / HBase

 I'd appreciate some thoughts / suggestions on which of these alternatives 
 I should go with (e.g, using raw Kafka consumers vs Spark for ETL, which 
 persistent data store to use, and how to query that data store in the 
 backend of the web UI, for displaying the reports).


 Thanks.
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Running in local mode as SQL engine - what to optimize?

2016-09-29 Thread RodrigoB
Hi all,

For several reasons which I won't elaborate (yet), we're using Spark on
local mode as an in memory SQL engine for data we're retrieving from
Cassandra, execute SQL queries and return to the client - so no cluster, no
worker nodes. I'm well aware local mode has always been considered a testing
mode, but it does fit our purposes at the moment

We're on Spark 2.0.0

I'm finding several challenges which I would like to get some comments if
possible:

1 - For group by based SQL queries I'm finding shuffle disk spills to
constantly happen, to a point where after a couple of days I have 9GB of
disk filled in the block manager folder with broadcast files. My
understanding is that disk spills only occur during the lifetime of an RDD.
Once the RDD is gone from memory, so should the files, this doesn't seem to
be happening. Is there any way of completely disable the disk spills? I've
tweaked the memory fraction configuration to maximize execution memory and
avoid the disk spills but doesn't seem to have done much to avoid the
spills...

2 - GC overhead is overwhelming - when refreshing an Dataframe (even empty
data!) and executing 1 group by queries every second on around 1MB of data,
the amount of Young Gen used goes up to 2GB every 10 seconds. I've started
profiling the JVM and can find considerable number of hashmap objects which
I assume are created internally in Spark.

3 - I'm really looking for low latency multithreaded refreshes and
collection of data frames - in order of milliseconds of query execution and
collection of data within this local node, and I'm afraid goes against the
nature of spark. Spark partitions all data s blocks and uses the scheduler
for its multi-node design, and that's great for multi-node execution. For a
local node execution adds considerable overhead, and I'm aware of this
constraint, the hope is that we could optimize it to do the point where this
kind of usage becomes a possibility - in memory efficient SQL engine within
the same JVM where the data lives. Any suggestions are very welcomed!

Thanks in advance,
Rod






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-in-local-mode-as-SQL-engine-what-to-optimize-tp27815.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Michael Segel
Spark standalone is not Yarn… or secure for that matter… ;-)

> On Sep 29, 2016, at 11:18 AM, Cody Koeninger  wrote:
> 
> Spark streaming helps with aggregation because
> 
> A. raw kafka consumers have no built in framework for shuffling
> amongst nodes, short of writing into an intermediate topic (I'm not
> touching Kafka Streams here, I don't have experience), and
> 
> B. it deals with batches, so you can transactionally decide to commit
> or rollback your aggregate data and your offsets.  Otherwise your
> offsets and data store can get out of sync, leading to lost /
> duplicate data.
> 
> Regarding long running spark jobs, I have streaming jobs in the
> standalone manager that have been running for 6 months or more.
> 
> On Thu, Sep 29, 2016 at 11:01 AM, Michael Segel
>  wrote:
>> Ok… so what’s the tricky part?
>> Spark Streaming isn’t real time so if you don’t mind a slight delay in 
>> processing… it would work.
>> 
>> The drawback is that you now have a long running Spark Job (assuming under 
>> YARN) and that could become a problem in terms of security and resources.
>> (How well does Yarn handle long running jobs these days in a secured 
>> Cluster? Steve L. may have some insight… )
>> 
>> Raw HDFS would become a problem because Apache HDFS is still a worm. (Do you 
>> want to write your own compaction code? Or use Hive 1.x+?)
>> 
>> HBase? Depending on your admin… stability could be a problem.
>> Cassandra? That would be a separate cluster and that in itself could be a 
>> problem…
>> 
>> YMMV so you need to address the pros/cons of each tool specific to your 
>> environment and skill level.
>> 
>> HTH
>> 
>> -Mike
>> 
>>> On Sep 29, 2016, at 8:54 AM, Ali Akhtar  wrote:
>>> 
>>> I have a somewhat tricky use case, and I'm looking for ideas.
>>> 
>>> I have 5-6 Kafka producers, reading various APIs, and writing their raw 
>>> data into Kafka.
>>> 
>>> I need to:
>>> 
>>> - Do ETL on the data, and standardize it.
>>> 
>>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS / 
>>> ElasticSearch / Postgres)
>>> 
>>> - Query this data to generate reports / analytics (There will be a web UI 
>>> which will be the front-end to the data, and will show the reports)
>>> 
>>> Java is being used as the backend language for everything (backend of the 
>>> web UI, as well as the ETL layer)
>>> 
>>> I'm considering:
>>> 
>>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer (receive 
>>> raw data from Kafka, standardize & store it)
>>> 
>>> - Using Cassandra, HBase, or raw HDFS, for storing the standardized data, 
>>> and to allow queries
>>> 
>>> - In the backend of the web UI, I could either use Spark to run queries 
>>> across the data (mostly filters), or directly run queries against Cassandra 
>>> / HBase
>>> 
>>> I'd appreciate some thoughts / suggestions on which of these alternatives I 
>>> should go with (e.g, using raw Kafka consumers vs Spark for ETL, which 
>>> persistent data store to use, and how to query that data store in the 
>>> backend of the web UI, for displaying the reports).
>>> 
>>> 
>>> Thanks.
>> 



Metrics System not recognizing Custom Source/Sink in application jar

2016-09-29 Thread map reduced
Hi,

I've added Custom Source and Sink in my application jar and found a way to
get a static fixed metrics.properties on Stand-alone cluster nodes. When I
want to launch my application, I give the static path -
spark.metrics.conf="/fixed-path/to/metrics.properties". Despite my custom
source/sink being in my code/fat-jar - I get ClassNotFoundException on
CustomSink.

My fat-jar (with Custom Source/Sink code in it) is on hdfs with read access
to all.

So here's what all I've already tried setting (since executors can't find
Custom Source/Sink in my application fat-jar):

   1. spark.executor.extraClassPath = hdfs://path/to/fat-jar
   2. spark.executor.extraClassPath = fat-jar-name.jar
   3. spark.executor.extraClassPath = ./fat-jar-name.jar
   4. spark.executor.extraClassPath = ./
   5. spark.executor.extraClassPath = /dir/on/cluster/* (although * is not
   at file level, there are more directories - I have no way of knowing random
   application-id or driver-id to give absolute name before launching the app)

It seems like this is how executors are getting initialized for this case
(please correct me if I am wrong) -

   1. Driver tells here's the jar location - hdfs://../fat-jar.jar and here
   are some properties like spark.executor.memory etc.
   2. N number of Executors spin up (depending on configuration) on cluster
   3. Start downloading hdfs://../fat-jar.jar but initialize metrics system
   in the mean time (? - not sure of this step)
   4. Metrics system looking for Custom Sink/Source files - since it's
   mentioned in metrics.properties - even before it's done downloading fat-jar
   (which actually has all those files) (this is my hypothesis)
   5. ClassNotFoundException - CustomSink not found!

Is my understanding correct? Moreover, is there anything else I can try? If
anyone has experience with custom source/sinks, any help would be
appreciated.

My SO question:
https://stackoverflow.com/questions/39763364/metrics-system-not-recognizing-custom-source-sink-in-application-jar


Thanks,

KP


Re: Re: Selecting the top 100 records per group by?

2016-09-29 Thread Mariano Semelman
It's not Spark specific, but it answers your question:
https://blog.jooq.org/2014/08/12/the-difference-between-row_number-rank-and-dense_rank/

On 12 September 2016 at 12:42, Mich Talebzadeh 
wrote:

> Hi,
>
> I don't understand why you need to add a column row_number when you can
> use rank or dens_rank?
>
> Why  one cannot one use rank or dens_rank here?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 12 September 2016 at 15:37,  wrote:
>
>> hi kevin
>> window function is what you need, like below:
>> val hivetable = hc.sql("select * from house_sale_pv_location")
>> val overLocation = Window.partitionBy(hivetable.c
>> ol("lp_location_id"))
>> val sortedDF = hivetable.withColumn("rowNumber",
>> row_number().over(overLocation)).filter("rowNumber<=50")
>>
>> here I add a column as rownumber,  get all data partitioned and get the
>> top 50 rows.
>>
>>
>>
>> 
>>
>> ThanksBest regards!
>> San.Luo
>>
>> - 原始邮件 -
>> 发件人:Mich Talebzadeh 
>> 收件人:"user @spark" 
>> 主题:Re: Selecting the top 100 records per group by?
>> 日期:2016年09月11日 22点20分
>>
>> You can of course do this using FP.
>>
>> val wSpec = Window.partitionBy('price).orderBy(desc("price"))
>> df2.filter('security > " 
>> ").select(dense_rank().over(wSpec).as("rank"),'TIMECREATED,
>> 'SECURITY, substring('PRICE,1,7)).filter('rank<=10).show
>>
>>
>> HTH
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 September 2016 at 07:15, Mich Talebzadeh > > wrote:
>>
>> DENSE_RANK will give you ordering and sequence within a particular
>> column. This is Hive
>>
>>  var sqltext = """
>>  | SELECT RANK, timecreated,security, price
>>  |  FROM (
>>  |SELECT timecreated,security, price,
>>  |   DENSE_RANK() OVER (ORDER BY price DESC ) AS RANK
>>  |  FROM test.prices
>>  |   ) tmp
>>  |  WHERE rank <= 10
>>  | """
>> sql(sqltext).collect.foreach(println)
>>
>> [1,2016-09-09 16:55:44,Esso,99.995]
>> [1,2016-09-09 21:22:52,AVIVA,99.995]
>> [1,2016-09-09 21:22:52,Barclays,99.995]
>> [1,2016-09-09 21:24:28,JPM,99.995]
>> [1,2016-09-09 21:30:38,Microsoft,99.995]
>> [1,2016-09-09 21:31:12,UNILEVER,99.995]
>> [2,2016-09-09 16:54:14,BP,99.99]
>> [2,2016-09-09 16:54:36,Tate & Lyle,99.99]
>> [2,2016-09-09 16:56:28,EASYJET,99.99]
>> [2,2016-09-09 16:59:28,IBM,99.99]
>> [2,2016-09-09 20:16:10,EXPERIAN,99.99]
>> [2,2016-09-09 22:25:20,Microsoft,99.99]
>> [2,2016-09-09 22:53:49,Tate & Lyle,99.99]
>> [3,2016-09-09 15:31:06,UNILEVER,99.985]
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 September 2016 at 04:32, Kevin Burton  wrote:
>>
>> Looks like you can do it with dense_rank functions.
>>
>> https://databricks.com/blog/2015/07/15/introducing-window-fu
>> nctions-in-spark-sql.html
>>
>> I setup some basic records and seems like it did the right thing.
>>
>> Now time to throw 50TB and 100 spark nodes at this problem and see what
>> happens :)
>>
>> On Sat, Sep 10, 2016 at 7:42 PM, Kevin Burton  wrote:
>>
>> Ah.. might actually. I'll have to mess around with 

Re: udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
df:  
-
a|b|c
---
1|m|n
1|x | j
2|m|x
...


import pyspark.sql.functions as F
from pyspark.sql.types import MapType, StringType

def my_zip(c, d):
return dict(zip(c, d))

my_zip = F.udf(_my_zip, MapType(StingType(), StringType(), True), True)

df.groupBy('a').agg(my_zip(collect_list('c'),
collect_list('d')).alias('named_list'))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pyspark - 1.5.0 pickle ML PipelineModel

2016-09-29 Thread Simone
Hi all, 
I am trying to save a trained ML pipeline model with pyspark 1.5.

I am aware there is no .save method till 1.6 and that the workaround that 
should work is to serialize the PipelineModel object.

This works in scala/java, but it seems like I cannot pickle the trained model 
in Python.

Has anyone solved this issue?

Thanks guys
Simone

Re: Question about executor memory setting

2016-09-29 Thread mohan s
Hi

Kindly go through the below link.  It explains good way about spark memory 
allocations.
https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications?from_m_app=ios

Regards

Mohan.s

> On 28-Sep-2016, at 7:57 AM, Dogtail L  wrote:
> 
> Hi all,
> 
> May I ask a question about executor memory setting? I was running PageRank 
> with input size 2.8GB on one workstation for testing. I gave PageRank one 
> executor.
> 
> In case 1, I set --executor-cores to 4, and --executor-memory to 1GB, the 
> stage (stage 2) completion time is 14 min, the the detailed stage info is 
> below:
> 
> 
> 
> In case 2, I set --executor-cores to 4, and --executor-memory to 6GB, the 
> stage (stage 2) completion time is 34 min, the the detailed stage info is 
> below:
> 
> ​
> I am totally confused why when executor-memory gets larger, the stage 
> completion time is more than two times slower? From the web UI, I found that 
> when executor memory is 6GB, the shuffle spill (Disk) per task is smaller, 
> which means fewer IO operations, but weirdly, the task completion time is 
> longer though. Could anyone give me some hints? Great thanks!


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Mich Talebzadeh
Hi Michael,

How about druid  here.

Hive ORC tables are another option that have  Streaming data ingest
to
Flume and storm

However, Spark cannot read ORC transactional tables because of delta files,
unless the compaction is done (a nightmare)

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 17:01, Michael Segel 
wrote:

> Ok… so what’s the tricky part?
> Spark Streaming isn’t real time so if you don’t mind a slight delay in
> processing… it would work.
>
> The drawback is that you now have a long running Spark Job (assuming under
> YARN) and that could become a problem in terms of security and resources.
> (How well does Yarn handle long running jobs these days in a secured
> Cluster? Steve L. may have some insight… )
>
> Raw HDFS would become a problem because Apache HDFS is still a worm. (Do
> you want to write your own compaction code? Or use Hive 1.x+?)
>
> HBase? Depending on your admin… stability could be a problem.
> Cassandra? That would be a separate cluster and that in itself could be a
> problem…
>
> YMMV so you need to address the pros/cons of each tool specific to your
> environment and skill level.
>
> HTH
>
> -Mike
>
> > On Sep 29, 2016, at 8:54 AM, Ali Akhtar  wrote:
> >
> > I have a somewhat tricky use case, and I'm looking for ideas.
> >
> > I have 5-6 Kafka producers, reading various APIs, and writing their raw
> data into Kafka.
> >
> > I need to:
> >
> > - Do ETL on the data, and standardize it.
> >
> > - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
> ElasticSearch / Postgres)
> >
> > - Query this data to generate reports / analytics (There will be a web
> UI which will be the front-end to the data, and will show the reports)
> >
> > Java is being used as the backend language for everything (backend of
> the web UI, as well as the ETL layer)
> >
> > I'm considering:
> >
> > - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
> (receive raw data from Kafka, standardize & store it)
> >
> > - Using Cassandra, HBase, or raw HDFS, for storing the standardized
> data, and to allow queries
> >
> > - In the backend of the web UI, I could either use Spark to run queries
> across the data (mostly filters), or directly run queries against Cassandra
> / HBase
> >
> > I'd appreciate some thoughts / suggestions on which of these
> alternatives I should go with (e.g, using raw Kafka consumers vs Spark for
> ETL, which persistent data store to use, and how to query that data store
> in the backend of the web UI, for displaying the reports).
> >
> >
> > Thanks.
>
>


Re: Submit and Monitor standalone cluster application

2016-09-29 Thread Mariano Semelman
Sorry, my mistake (quick copy-paste), livy doesn't let me submit
applications the classic way (with assembly jars) and force me to change
all my current applications.

--

*Mariano Semelman*
P13N - IT
Av. Corrientes Nº 746 - piso 13 - C.A.B.A. (C1043AAU)
Teléfono (54) 11- *4894-3500*


[image: Seguinos en Twitter!]  [image:
Seguinos en Facebook!]  [image: Seguinos
en YouTube!] 
*Despegar.com*
El mejor precio para tu viaje.

Este mensaje es confidencial y puede contener información amparada por el
secreto profesional. Si usted ha recibido este e-mail por error, por favor
comuníquenoslo inmediatamente respondiendo a este e-mail y luego
eliminándolo de su sistema. El contenido de este mensaje no deberá ser
copiado ni divulgado a ninguna persona.

On 29 September 2016 at 01:08, Ofer Eliassaf 
wrote:

> Are u sure that livy doesn't support standalone cluster mode?
>
> On Thu, Sep 29, 2016 at 1:42 AM, Mariano Semelman <
> mariano.semel...@despegar.com> wrote:
>
>> ​Hello everybody,
>>
>> I'm developing an application to submit batch and streaming apps in a
>> fault tolerant fashion. For that I need a programatically way to submit and
>> monitor my apps and relaunch them in case of failure. Right now I'm using
>> spark standalone (1.6.x) and submitting in cluster mode. The options I have
>> explored so far are:
>>
>> SparkLauncher.java [1]: It has two modes:
>> - 1) launch() Doesn't give me the application-id in order to monitor
>> (with spark master rest API). Would have to infer from the application name 
>> and
>> startTime in api/v1/applications using the spark master API [9]
>> - 2) startApplication(...) Only works if submitted locally or client
>> mode (BTW, the fact that only works in client or local mode is not
>> documented in the package summary page[1] which led me to many, many wasted
>> hours)
>>
>> Spark-Jobserver [2]:
>> Doesn't support standalone cluster mode
>>
>> Livy [3]:
>> Doesn't support standalone cluster mode
>>
>> Spark Submission Rest API [4,5,6]:
>> It seems the sensible way, but is black magic for the user. It's not
>> documented and there's no official Client. There's only one [7] unofficial
>> client. And it occurred to me also to copy in my own project the
>> RestSubmissionClient [8].
>>
>>
>> I'm between using launch and infer the appId or using Spark Submission
>> Rest API, but none of them seem a proper way to solve this. If someone
>> could give me an advise on how to face this I would appreciate it.
>>
>> Thanks in advance,
>>
>> Mariano
>>
>>
>> [1] https://spark.apache.org/docs/1.6.1/api/java/org/apache/
>> spark/launcher/package-summary.html
>> [2] https://github.com/spark-jobserver/spark-jobserver
>> [3] http://livy.io/
>> [4] http://stackoverflow.com/questions/28992802/triggering-s
>> park-jobs-with-rest (most voted answer)
>> [5] http://arturmkrtchyan.com/apache-spark-hidden-rest-api
>> [6] https://issues.apache.org/jira/browse/SPARK-5388
>> [7] https://github.com/ywilkof/spark-jobs-rest-client
>> [8] https://github.com/apache/spark/blob/master/core/src/mai
>> n/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
>> [9] http://spark.apache.org/docs/latest/monitoring.html
>>
>>
>>
>>
>
>
> --
> Regards,
> Ofer Eliassaf
>


spark streaming minimum batch interval

2016-09-29 Thread Shushant Arora
Hi

I want to enquire does spark streaming has some limitation of 500ms of
batch intreval ?

Is storm better than spark streaming for real time (for latency of just
50-100ms). In spark streaming can parallel batches be run ? If yes is it
supported at productionlevel.

Thanks


Re: Treadting NaN fields in Spark

2016-09-29 Thread Mich Talebzadeh
Thanks Michael.

I realised that just checking for Volume > 0 should do

val rs = df2.filter($"Volume".cast("Integer") > 0)

will do,

Your point on

Again why not remove the rows where the volume of trades is 0?

Are you referring to below

scala> val rs = df2.filter($"Volume".cast("Integer") === 0).drop().show
+-+--+--+++---+--+--+
|Stock|Ticker| TradeDate|Open|High|Low| Close|Volume|
+-+--+--+++---+--+--+
|Tesco PLC|  TSCO| 23-Dec-11|   -|   -|  -|391.00| 0|
|Tesco PLC|  TSCO| 26-Aug-11|   -|   -|  -|365.60| 0|
|Tesco PLC|  TSCO| 28-Apr-11|   -|   -|  -|403.55| 0|
|Tesco PLC|  TSCO| 21-Apr-11|   -|   -|  -|395.30| 0|
|Tesco PLC|  TSCO| 24-Dec-10|   -|   -|  -|439.00| 0|
+-+--+--+++---+--+--+

Cheers



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 16:55, Michael Segel 
wrote:

>
> On Sep 29, 2016, at 10:29 AM, Mich Talebzadeh 
> wrote:
>
> Good points :) it took take "-" as a negative number -123456?
>
> Yeah… you have to go down a level and start to remember that you’re
> dealing with a stream or buffer of bytes below any casting.
>
> At this moment in time this is what the code does
>
>
>1. csv is imported into HDFS as is. No cleaning done for rogue columns
>done at shell level
>2. Spark programs does the following filtration:
>3. val rs = df2.filter($"Open" !== "-").filter($"Volume".cast("Integer")
>> 0)
>
> So my first line of defence is to check for !== "-" which is a dash,
> commonly used for not available. The next filter is for volume column > 0
> (there was trades on this stock), otherwise the calculation could skew the
> results.  Note that a filter with AND with !== will not work.
>
>
> You can’t rely on the ‘-‘ to represent NaN or NULL.
>
> The issue is that you’re going from a loose typing to a stronger typing
> (String to Double).
> So pretty much any byte buffer could be interpreted as a String, but iff
> the String value is too long to be a Double, you will fail the NaN test.
> (Or its a NULL value/string)
> As to filtering… you would probably want to filter on volume being == 0.
>  (Its possible to actually have a negative volume.
> Or you could set the opening, low, high to the close if the volume is 0
> regardless of the values in those columns.
>
> Note: This would be a transformation of the data and should be done during
> ingestion so you’re doing it only once.
>
> Or you could just remove the rows since no trades occurred and then either
> reflect it in your graph as gaps or the graph interpolates it out .
>
>
> scala> val rs = df2.filter($"Open" !== "-" && $"Volume".cast("Integer") >
> 0)
> :40: error: value && is not a member of String
>val rs = df2.filter($"Open" !== "-" && $"Volume".cast("Integer") >
> 0)
>
> Will throw an error.
> But this equality === works!
>
> scala> val rs = df2.filter($"Open" *=== "-"* && $"Volume".cast("Integer")
> > 0)
> rs: org.apache.spark.sql.Dataset[columns] = [Stock: string, Ticker:
> string ... 6 more fields]
>
>
> Another alternative is to check for all digits here
>
>  scala> def isAllPostiveNumber (price: String) = price forall
> Character.isDigit
> isAllPostiveNumber: (price: String)Boolean
>
> Not really a good idea. You’re walking thru each byte in a stream and
> checking to see if its a digit. What if its a NULL string? What do you set
> the value to?
> This doesn’t scale well…
>
> Again why not remove the rows where the volume of trades is 0?
>
> Retuns Boolean true or false.  But does not work unless someone tells me
> what is wrong with this below!
>
> scala> val rs = df2.filter(isAllPostiveNumber("Open") => true)
>
> scala> val rs = df2.filter(isAllPostiveNumber("Open") => true)
> :1: error: not a legal formal parameter.
> Note: Tuples cannot be directly destructured in method or function
> parameters.
>   Either create a single parameter accepting the Tuple1,
>   or consider a pattern matching anonymous function: `{ case (param1,
> param1) => ... }
> val rs = df2.filter(isAllPostiveNumber("Open") => true)
>
>
> Thanks
>
>
>
>
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Cody Koeninger
> I still don't understand why writing to a transactional database with locking 
> and concurrency (read and writes) through JDBC will be fast for this sort of 
> data ingestion.

Who cares about fast if your data is wrong?  And it's still plenty fast enough

https://youtu.be/NVl9_6J1G60?list=WL=1819

https://www.citusdata.com/blog/2016/09/22/announcing-citus-mx/



On Thu, Sep 29, 2016 at 11:16 AM, Mich Talebzadeh
 wrote:
> The way I see this, there are two things involved.
>
> Data ingestion through source to Kafka
> Date conversion and Storage ETL/ELT
> Presentation
>
> Item 2 is the one that needs to be designed correctly. I presume raw data
> has to confirm to some form of MDM that requires schema mapping etc before
> putting into persistent storage (DB, HDFS etc). Which one to choose depends
> on your volume of ingestion and your cluster size and complexity of data
> conversion. Then your users will use some form of UI (Tableau, QlikView,
> Zeppelin, direct SQL) to query data one way or other. Your users can
> directly use UI like Tableau that offer in built analytics on SQL. Spark sql
> offers the same). Your mileage varies according to your needs.
>
> I still don't understand why writing to a transactional database with
> locking and concurrency (read and writes) through JDBC will be fast for this
> sort of data ingestion. If you ask me if I wanted to choose an RDBMS to
> write to as my sink,I would use Oracle which offers the best locking and
> concurrency among RDBMs and also handles key value pairs as well (assuming
> that is what you want). In addition, it can be used as a Data Warehouse as
> well.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed. The
> author will in no case be liable for any monetary damages arising from such
> loss, damage or destruction.
>
>
>
>
> On 29 September 2016 at 16:49, Ali Akhtar  wrote:
>>
>> The business use case is to read a user's data from a variety of different
>> services through their API, and then allowing the user to query that data,
>> on a per service basis, as well as an aggregate across all services.
>>
>> The way I'm considering doing it, is to do some basic ETL (drop all the
>> unnecessary fields, rename some fields into something more manageable, etc)
>> and then store the data in Cassandra / Postgres.
>>
>> Then, when the user wants to view a particular report, query the
>> respective table in Cassandra / Postgres. (select .. from data where user =
>> ? and date between  and  and some_field = ?)
>>
>> How will Spark Streaming help w/ aggregation? Couldn't the data be queried
>> from Cassandra / Postgres via the Kafka consumer and aggregated that way?
>>
>> On Thu, Sep 29, 2016 at 8:43 PM, Cody Koeninger 
>> wrote:
>>>
>>> No, direct stream in and of itself won't ensure an end-to-end
>>> guarantee, because it doesn't know anything about your output actions.
>>>
>>> You still need to do some work.  The point is having easy access to
>>> offsets for batches on a per-partition basis makes it easier to do
>>> that work, especially in conjunction with aggregation.
>>>
>>> On Thu, Sep 29, 2016 at 10:40 AM, Deepak Sharma 
>>> wrote:
>>> > If you use spark direct streams , it ensure end to end guarantee for
>>> > messages.
>>> >
>>> >
>>> > On Thu, Sep 29, 2016 at 9:05 PM, Ali Akhtar 
>>> > wrote:
>>> >>
>>> >> My concern with Postgres / Cassandra is only scalability. I will look
>>> >> further into Postgres horizontal scaling, thanks.
>>> >>
>>> >> Writes could be idempotent if done as upserts, otherwise updates will
>>> >> be
>>> >> idempotent but not inserts.
>>> >>
>>> >> Data should not be lost. The system should be as fault tolerant as
>>> >> possible.
>>> >>
>>> >> What's the advantage of using Spark for reading Kafka instead of
>>> >> direct
>>> >> Kafka consumers?
>>> >>
>>> >> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger 
>>> >> wrote:
>>> >>>
>>> >>> I wouldn't give up the flexibility and maturity of a relational
>>> >>> database, unless you have a very specific use case.  I'm not trashing
>>> >>> cassandra, I've used cassandra, but if all I know is that you're
>>> >>> doing
>>> >>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
>>> >>> aggregations without a lot of forethought.  If you're worried about
>>> >>> scaling, there are several options for horizontally scaling Postgres
>>> >>> in particular.  One of the current best from what I've worked with is
>>> >>> Citus.
>>> >>>
>>> >>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma
>>> 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Cody Koeninger
Spark streaming helps with aggregation because

A. raw kafka consumers have no built in framework for shuffling
amongst nodes, short of writing into an intermediate topic (I'm not
touching Kafka Streams here, I don't have experience), and

B. it deals with batches, so you can transactionally decide to commit
or rollback your aggregate data and your offsets.  Otherwise your
offsets and data store can get out of sync, leading to lost /
duplicate data.

Regarding long running spark jobs, I have streaming jobs in the
standalone manager that have been running for 6 months or more.

On Thu, Sep 29, 2016 at 11:01 AM, Michael Segel
 wrote:
> Ok… so what’s the tricky part?
> Spark Streaming isn’t real time so if you don’t mind a slight delay in 
> processing… it would work.
>
> The drawback is that you now have a long running Spark Job (assuming under 
> YARN) and that could become a problem in terms of security and resources.
> (How well does Yarn handle long running jobs these days in a secured Cluster? 
> Steve L. may have some insight… )
>
> Raw HDFS would become a problem because Apache HDFS is still a worm. (Do you 
> want to write your own compaction code? Or use Hive 1.x+?)
>
> HBase? Depending on your admin… stability could be a problem.
> Cassandra? That would be a separate cluster and that in itself could be a 
> problem…
>
> YMMV so you need to address the pros/cons of each tool specific to your 
> environment and skill level.
>
> HTH
>
> -Mike
>
>> On Sep 29, 2016, at 8:54 AM, Ali Akhtar  wrote:
>>
>> I have a somewhat tricky use case, and I'm looking for ideas.
>>
>> I have 5-6 Kafka producers, reading various APIs, and writing their raw data 
>> into Kafka.
>>
>> I need to:
>>
>> - Do ETL on the data, and standardize it.
>>
>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS / 
>> ElasticSearch / Postgres)
>>
>> - Query this data to generate reports / analytics (There will be a web UI 
>> which will be the front-end to the data, and will show the reports)
>>
>> Java is being used as the backend language for everything (backend of the 
>> web UI, as well as the ETL layer)
>>
>> I'm considering:
>>
>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer (receive 
>> raw data from Kafka, standardize & store it)
>>
>> - Using Cassandra, HBase, or raw HDFS, for storing the standardized data, 
>> and to allow queries
>>
>> - In the backend of the web UI, I could either use Spark to run queries 
>> across the data (mostly filters), or directly run queries against Cassandra 
>> / HBase
>>
>> I'd appreciate some thoughts / suggestions on which of these alternatives I 
>> should go with (e.g, using raw Kafka consumers vs Spark for ETL, which 
>> persistent data store to use, and how to query that data store in the 
>> backend of the web UI, for displaying the reports).
>>
>>
>> Thanks.
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Mich Talebzadeh
The way I see this, there are two things involved.


   1. Data ingestion through source to Kafka
   2. Date conversion and Storage ETL/ELT
   3. Presentation

Item 2 is the one that needs to be designed correctly. I presume raw data
has to confirm to some form of MDM that requires schema mapping etc before
putting into persistent storage (DB, HDFS etc). Which one to choose depends
on your volume of ingestion and your cluster size and complexity of data
conversion. Then your users will use some form of UI (Tableau, QlikView,
Zeppelin, direct SQL) to query data one way or other. Your users can
directly use UI like Tableau that offer in built analytics on SQL. Spark
sql offers the same). Your mileage varies according to your needs.

I still don't understand why writing to a transactional database with
locking and concurrency (read and writes) through JDBC will be fast for
this sort of data ingestion. If you ask me if I wanted to choose an RDBMS
to write to as my sink,I would use Oracle which offers the best locking and
concurrency among RDBMs and also handles key value pairs as well (assuming
that is what you want). In addition, it can be used as a Data Warehouse as
well.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 16:49, Ali Akhtar  wrote:

> The business use case is to read a user's data from a variety of different
> services through their API, and then allowing the user to query that data,
> on a per service basis, as well as an aggregate across all services.
>
> The way I'm considering doing it, is to do some basic ETL (drop all the
> unnecessary fields, rename some fields into something more manageable, etc)
> and then store the data in Cassandra / Postgres.
>
> Then, when the user wants to view a particular report, query the
> respective table in Cassandra / Postgres. (select .. from data where user =
> ? and date between  and  and some_field = ?)
>
> How will Spark Streaming help w/ aggregation? Couldn't the data be queried
> from Cassandra / Postgres via the Kafka consumer and aggregated that way?
>
> On Thu, Sep 29, 2016 at 8:43 PM, Cody Koeninger 
> wrote:
>
>> No, direct stream in and of itself won't ensure an end-to-end
>> guarantee, because it doesn't know anything about your output actions.
>>
>> You still need to do some work.  The point is having easy access to
>> offsets for batches on a per-partition basis makes it easier to do
>> that work, especially in conjunction with aggregation.
>>
>> On Thu, Sep 29, 2016 at 10:40 AM, Deepak Sharma 
>> wrote:
>> > If you use spark direct streams , it ensure end to end guarantee for
>> > messages.
>> >
>> >
>> > On Thu, Sep 29, 2016 at 9:05 PM, Ali Akhtar 
>> wrote:
>> >>
>> >> My concern with Postgres / Cassandra is only scalability. I will look
>> >> further into Postgres horizontal scaling, thanks.
>> >>
>> >> Writes could be idempotent if done as upserts, otherwise updates will
>> be
>> >> idempotent but not inserts.
>> >>
>> >> Data should not be lost. The system should be as fault tolerant as
>> >> possible.
>> >>
>> >> What's the advantage of using Spark for reading Kafka instead of direct
>> >> Kafka consumers?
>> >>
>> >> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger 
>> >> wrote:
>> >>>
>> >>> I wouldn't give up the flexibility and maturity of a relational
>> >>> database, unless you have a very specific use case.  I'm not trashing
>> >>> cassandra, I've used cassandra, but if all I know is that you're doing
>> >>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
>> >>> aggregations without a lot of forethought.  If you're worried about
>> >>> scaling, there are several options for horizontally scaling Postgres
>> >>> in particular.  One of the current best from what I've worked with is
>> >>> Citus.
>> >>>
>> >>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma <
>> deepakmc...@gmail.com>
>> >>> wrote:
>> >>> > Hi Cody
>> >>> > Spark direct stream is just fine for this use case.
>> >>> > But why postgres and not cassandra?
>> >>> > Is there anything specific here that i may not be aware?
>> >>> >
>> >>> > Thanks
>> >>> > Deepak
>> >>> >
>> >>> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger > >
>> >>> > wrote:
>> >>> >>
>> >>> >> How are you going to handle etl failures?  Do you care about lost /
>> >>> >> duplicated data?  Are your writes 

Fwd: tod...@yahoo-inc.com is no longer with Yahoo! (was: Re: Treadting NaN fields in Spark)

2016-09-29 Thread Michael Segel
Hi,
Hate to be a pain… but could someone remove this email address (see below) from 
the spark mailing list(s)
It seems that ‘Elvis’ has left the building and forgot to change his mail 
subscriptions…

Begin forwarded message:

From: Yahoo! No Reply 
>
Subject: tod...@yahoo-inc.com is no longer with 
Yahoo! (was: Re: Treadting NaN fields in Spark)
Date: September 29, 2016 at 10:56:10 AM CDT
To: >


This is an automatically generated message.

tod...@yahoo-inc.com is no longer with Yahoo! Inc.

Your message will not be forwarded.

If you have a sales inquiry, please email 
yahoosa...@yahoo-inc.com and someone will 
follow up with you shortly.

If you require assistance with a legal matter, please send a message to 
legal-noti...@yahoo-inc.com

Thank you!



Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Michael Segel
Ok… so what’s the tricky part? 
Spark Streaming isn’t real time so if you don’t mind a slight delay in 
processing… it would work.

The drawback is that you now have a long running Spark Job (assuming under 
YARN) and that could become a problem in terms of security and resources. 
(How well does Yarn handle long running jobs these days in a secured Cluster? 
Steve L. may have some insight… ) 

Raw HDFS would become a problem because Apache HDFS is still a worm. (Do you 
want to write your own compaction code? Or use Hive 1.x+?)

HBase? Depending on your admin… stability could be a problem. 
Cassandra? That would be a separate cluster and that in itself could be a 
problem… 

YMMV so you need to address the pros/cons of each tool specific to your 
environment and skill level. 

HTH

-Mike

> On Sep 29, 2016, at 8:54 AM, Ali Akhtar  wrote:
> 
> I have a somewhat tricky use case, and I'm looking for ideas.
> 
> I have 5-6 Kafka producers, reading various APIs, and writing their raw data 
> into Kafka.
> 
> I need to:
> 
> - Do ETL on the data, and standardize it.
> 
> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS / 
> ElasticSearch / Postgres)
> 
> - Query this data to generate reports / analytics (There will be a web UI 
> which will be the front-end to the data, and will show the reports)
> 
> Java is being used as the backend language for everything (backend of the web 
> UI, as well as the ETL layer)
> 
> I'm considering:
> 
> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer (receive 
> raw data from Kafka, standardize & store it)
> 
> - Using Cassandra, HBase, or raw HDFS, for storing the standardized data, and 
> to allow queries
> 
> - In the backend of the web UI, I could either use Spark to run queries 
> across the data (mostly filters), or directly run queries against Cassandra / 
> HBase
> 
> I'd appreciate some thoughts / suggestions on which of these alternatives I 
> should go with (e.g, using raw Kafka consumers vs Spark for ETL, which 
> persistent data store to use, and how to query that data store in the backend 
> of the web UI, for displaying the reports).
> 
> 
> Thanks.



Re: Treadting NaN fields in Spark

2016-09-29 Thread Michael Segel

On Sep 29, 2016, at 10:29 AM, Mich Talebzadeh 
> wrote:

Good points :) it took take "-" as a negative number -123456?

Yeah… you have to go down a level and start to remember that you’re dealing 
with a stream or buffer of bytes below any casting.

At this moment in time this is what the code does


  1.  csv is imported into HDFS as is. No cleaning done for rogue columns done 
at shell level
  2.  Spark programs does the following filtration:
  3.  val rs = df2.filter($"Open" !== "-").filter($"Volume".cast("Integer") > 0)

So my first line of defence is to check for !== "-" which is a dash, commonly 
used for not available. The next filter is for volume column > 0 (there was 
trades on this stock), otherwise the calculation could skew the results.  Note 
that a filter with AND with !== will not work.


You can’t rely on the ‘-‘ to represent NaN or NULL.

The issue is that you’re going from a loose typing to a stronger typing (String 
to Double).
So pretty much any byte buffer could be interpreted as a String, but iff the 
String value is too long to be a Double, you will fail the NaN test. (Or its a 
NULL value/string)
As to filtering… you would probably want to filter on volume being == 0.  (Its 
possible to actually have a negative volume.
Or you could set the opening, low, high to the close if the volume is 0 
regardless of the values in those columns.

Note: This would be a transformation of the data and should be done during 
ingestion so you’re doing it only once.

Or you could just remove the rows since no trades occurred and then either 
reflect it in your graph as gaps or the graph interpolates it out .


scala> val rs = df2.filter($"Open" !== "-" && $"Volume".cast("Integer") > 0)
:40: error: value && is not a member of String
   val rs = df2.filter($"Open" !== "-" && $"Volume".cast("Integer") > 0)

Will throw an error.

But this equality === works!

scala> val rs = df2.filter($"Open" === "-" && $"Volume".cast("Integer") > 0)
rs: org.apache.spark.sql.Dataset[columns] = [Stock: string, Ticker: string ... 
6 more fields]


Another alternative is to check for all digits here

 scala> def isAllPostiveNumber (price: String) = price forall Character.isDigit
isAllPostiveNumber: (price: String)Boolean

Not really a good idea. You’re walking thru each byte in a stream and checking 
to see if its a digit. What if its a NULL string? What do you set the value to?
This doesn’t scale well…

Again why not remove the rows where the volume of trades is 0?

Retuns Boolean true or false.  But does not work unless someone tells me what 
is wrong with this below!

scala> val rs = df2.filter(isAllPostiveNumber("Open") => true)

scala> val rs = df2.filter(isAllPostiveNumber("Open") => true)
:1: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
  Either create a single parameter accepting the Tuple1,
  or consider a pattern matching anonymous function: `{ case (param1, 
param1) => ... }
val rs = df2.filter(isAllPostiveNumber("Open") => true)


Thanks











Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 29 September 2016 at 13:45, Michael Segel 
> wrote:
Hi,

Just a few thoughts so take it for what its worth…

Databases have static schemas and will reject a row’s column on insert.

In your case… you have one data set where you have a column which is supposed 
to be a number but you have it as a string.
You want to convert this to a double in your final data set.


It looks like your problem is that your original data set that you ingested 
used a ‘-‘ (dash) to represent missing data, rather than a NULL value.
In fact, looking at the rows… you seem to have a stock that didn’t trade for a 
given day. (All have Volume as 0. ) Why do you need this?  Wouldn’t you want to 
represent this as null or no row for a given date?

The reason your ‘-‘ check failed when isnan() is that ‘-‘ actually could be 
represented as a number.

If you replaced the ‘-‘ with a String that is wider than the width of a double 
… the isnan should flag the row.

(I still need more coffee, so I could be wrong) ;-)

HTH

-Mike

On Sep 28, 2016, at 5:56 AM, Mich Talebzadeh 
> wrote:


This is an issue in most databases. Specifically if a field is NaN.. --> (NaN, 
standing for not a number, is a numeric data type value 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
The business use case is to read a user's data from a variety of different
services through their API, and then allowing the user to query that data,
on a per service basis, as well as an aggregate across all services.

The way I'm considering doing it, is to do some basic ETL (drop all the
unnecessary fields, rename some fields into something more manageable, etc)
and then store the data in Cassandra / Postgres.

Then, when the user wants to view a particular report, query the respective
table in Cassandra / Postgres. (select .. from data where user = ? and date
between  and  and some_field = ?)

How will Spark Streaming help w/ aggregation? Couldn't the data be queried
from Cassandra / Postgres via the Kafka consumer and aggregated that way?

On Thu, Sep 29, 2016 at 8:43 PM, Cody Koeninger  wrote:

> No, direct stream in and of itself won't ensure an end-to-end
> guarantee, because it doesn't know anything about your output actions.
>
> You still need to do some work.  The point is having easy access to
> offsets for batches on a per-partition basis makes it easier to do
> that work, especially in conjunction with aggregation.
>
> On Thu, Sep 29, 2016 at 10:40 AM, Deepak Sharma 
> wrote:
> > If you use spark direct streams , it ensure end to end guarantee for
> > messages.
> >
> >
> > On Thu, Sep 29, 2016 at 9:05 PM, Ali Akhtar 
> wrote:
> >>
> >> My concern with Postgres / Cassandra is only scalability. I will look
> >> further into Postgres horizontal scaling, thanks.
> >>
> >> Writes could be idempotent if done as upserts, otherwise updates will be
> >> idempotent but not inserts.
> >>
> >> Data should not be lost. The system should be as fault tolerant as
> >> possible.
> >>
> >> What's the advantage of using Spark for reading Kafka instead of direct
> >> Kafka consumers?
> >>
> >> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger 
> >> wrote:
> >>>
> >>> I wouldn't give up the flexibility and maturity of a relational
> >>> database, unless you have a very specific use case.  I'm not trashing
> >>> cassandra, I've used cassandra, but if all I know is that you're doing
> >>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
> >>> aggregations without a lot of forethought.  If you're worried about
> >>> scaling, there are several options for horizontally scaling Postgres
> >>> in particular.  One of the current best from what I've worked with is
> >>> Citus.
> >>>
> >>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma  >
> >>> wrote:
> >>> > Hi Cody
> >>> > Spark direct stream is just fine for this use case.
> >>> > But why postgres and not cassandra?
> >>> > Is there anything specific here that i may not be aware?
> >>> >
> >>> > Thanks
> >>> > Deepak
> >>> >
> >>> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger 
> >>> > wrote:
> >>> >>
> >>> >> How are you going to handle etl failures?  Do you care about lost /
> >>> >> duplicated data?  Are your writes idempotent?
> >>> >>
> >>> >> Absent any other information about the problem, I'd stay away from
> >>> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
> >>> >> feeding postgres.
> >>> >>
> >>> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar 
> >>> >> wrote:
> >>> >> > Is there an advantage to that vs directly consuming from Kafka?
> >>> >> > Nothing
> >>> >> > is
> >>> >> > being done to the data except some light ETL and then storing it
> in
> >>> >> > Cassandra
> >>> >> >
> >>> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma
> >>> >> > 
> >>> >> > wrote:
> >>> >> >>
> >>> >> >> Its better you use spark's direct stream to ingest from kafka.
> >>> >> >>
> >>> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar <
> ali.rac...@gmail.com>
> >>> >> >> wrote:
> >>> >> >>>
> >>> >> >>> I don't think I need a different speed storage and batch
> storage.
> >>> >> >>> Just
> >>> >> >>> taking in raw data from Kafka, standardizing, and storing it
> >>> >> >>> somewhere
> >>> >> >>> where
> >>> >> >>> the web UI can query it, seems like it will be enough.
> >>> >> >>>
> >>> >> >>> I'm thinking about:
> >>> >> >>>
> >>> >> >>> - Reading data from Kafka via Spark Streaming
> >>> >> >>> - Standardizing, then storing it in Cassandra
> >>> >> >>> - Querying Cassandra from the web ui
> >>> >> >>>
> >>> >> >>> That seems like it will work. My question now is whether to use
> >>> >> >>> Spark
> >>> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
> >>> >> >>>
> >>> >> >>>
> >>> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
> >>> >> >>>  wrote:
> >>> >> 
> >>> >>  - Spark Streaming to read data from Kafka
> >>> >>  - Storing the data on HDFS using Flume
> >>> >> 
> >>> >>  You don't need Spark streaming to read data from Kafka and
> store
> >>> >>  on
> >>> >>  HDFS. It is a 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Cody Koeninger
No, direct stream in and of itself won't ensure an end-to-end
guarantee, because it doesn't know anything about your output actions.

You still need to do some work.  The point is having easy access to
offsets for batches on a per-partition basis makes it easier to do
that work, especially in conjunction with aggregation.

On Thu, Sep 29, 2016 at 10:40 AM, Deepak Sharma  wrote:
> If you use spark direct streams , it ensure end to end guarantee for
> messages.
>
>
> On Thu, Sep 29, 2016 at 9:05 PM, Ali Akhtar  wrote:
>>
>> My concern with Postgres / Cassandra is only scalability. I will look
>> further into Postgres horizontal scaling, thanks.
>>
>> Writes could be idempotent if done as upserts, otherwise updates will be
>> idempotent but not inserts.
>>
>> Data should not be lost. The system should be as fault tolerant as
>> possible.
>>
>> What's the advantage of using Spark for reading Kafka instead of direct
>> Kafka consumers?
>>
>> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger 
>> wrote:
>>>
>>> I wouldn't give up the flexibility and maturity of a relational
>>> database, unless you have a very specific use case.  I'm not trashing
>>> cassandra, I've used cassandra, but if all I know is that you're doing
>>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
>>> aggregations without a lot of forethought.  If you're worried about
>>> scaling, there are several options for horizontally scaling Postgres
>>> in particular.  One of the current best from what I've worked with is
>>> Citus.
>>>
>>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma 
>>> wrote:
>>> > Hi Cody
>>> > Spark direct stream is just fine for this use case.
>>> > But why postgres and not cassandra?
>>> > Is there anything specific here that i may not be aware?
>>> >
>>> > Thanks
>>> > Deepak
>>> >
>>> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger 
>>> > wrote:
>>> >>
>>> >> How are you going to handle etl failures?  Do you care about lost /
>>> >> duplicated data?  Are your writes idempotent?
>>> >>
>>> >> Absent any other information about the problem, I'd stay away from
>>> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
>>> >> feeding postgres.
>>> >>
>>> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar 
>>> >> wrote:
>>> >> > Is there an advantage to that vs directly consuming from Kafka?
>>> >> > Nothing
>>> >> > is
>>> >> > being done to the data except some light ETL and then storing it in
>>> >> > Cassandra
>>> >> >
>>> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma
>>> >> > 
>>> >> > wrote:
>>> >> >>
>>> >> >> Its better you use spark's direct stream to ingest from kafka.
>>> >> >>
>>> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
>>> >> >> wrote:
>>> >> >>>
>>> >> >>> I don't think I need a different speed storage and batch storage.
>>> >> >>> Just
>>> >> >>> taking in raw data from Kafka, standardizing, and storing it
>>> >> >>> somewhere
>>> >> >>> where
>>> >> >>> the web UI can query it, seems like it will be enough.
>>> >> >>>
>>> >> >>> I'm thinking about:
>>> >> >>>
>>> >> >>> - Reading data from Kafka via Spark Streaming
>>> >> >>> - Standardizing, then storing it in Cassandra
>>> >> >>> - Querying Cassandra from the web ui
>>> >> >>>
>>> >> >>> That seems like it will work. My question now is whether to use
>>> >> >>> Spark
>>> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
>>> >> >>>
>>> >> >>>
>>> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
>>> >> >>>  wrote:
>>> >> 
>>> >>  - Spark Streaming to read data from Kafka
>>> >>  - Storing the data on HDFS using Flume
>>> >> 
>>> >>  You don't need Spark streaming to read data from Kafka and store
>>> >>  on
>>> >>  HDFS. It is a waste of resources.
>>> >> 
>>> >>  Couple Flume to use Kafka as source and HDFS as sink directly
>>> >> 
>>> >>  KafkaAgent.sources = kafka-sources
>>> >>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
>>> >> 
>>> >>  That will be for your batch layer. To analyse you can directly
>>> >>  read
>>> >>  from
>>> >>  hdfs files with Spark or simply store data in a database of your
>>> >>  choice via
>>> >>  cron or something. Do not mix your batch layer with speed layer.
>>> >> 
>>> >>  Your speed layer will ingest the same data directly from Kafka
>>> >>  into
>>> >>  spark streaming and that will be  online or near real time
>>> >>  (defined
>>> >>  by your
>>> >>  window).
>>> >> 
>>> >>  Then you have a a serving layer to present data from both speed
>>> >>  (the
>>> >>  one from SS) and batch layer.
>>> >> 
>>> >>  HTH
>>> >> 
>>> >> 
>>> >> 
>>> >> 
>>> >>  Dr Mich Talebzadeh
>>> >> 
>>> >> 
>>> >> 
>>> >> 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Mich Talebzadeh
Hi Ali,

What is the business use case for this?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 16:40, Deepak Sharma  wrote:

> If you use spark direct streams , it ensure end to end guarantee for
> messages.
>
>
> On Thu, Sep 29, 2016 at 9:05 PM, Ali Akhtar  wrote:
>
>> My concern with Postgres / Cassandra is only scalability. I will look
>> further into Postgres horizontal scaling, thanks.
>>
>> Writes could be idempotent if done as upserts, otherwise updates will be
>> idempotent but not inserts.
>>
>> Data should not be lost. The system should be as fault tolerant as
>> possible.
>>
>> What's the advantage of using Spark for reading Kafka instead of direct
>> Kafka consumers?
>>
>> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger 
>> wrote:
>>
>>> I wouldn't give up the flexibility and maturity of a relational
>>> database, unless you have a very specific use case.  I'm not trashing
>>> cassandra, I've used cassandra, but if all I know is that you're doing
>>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
>>> aggregations without a lot of forethought.  If you're worried about
>>> scaling, there are several options for horizontally scaling Postgres
>>> in particular.  One of the current best from what I've worked with is
>>> Citus.
>>>
>>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma 
>>> wrote:
>>> > Hi Cody
>>> > Spark direct stream is just fine for this use case.
>>> > But why postgres and not cassandra?
>>> > Is there anything specific here that i may not be aware?
>>> >
>>> > Thanks
>>> > Deepak
>>> >
>>> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger 
>>> wrote:
>>> >>
>>> >> How are you going to handle etl failures?  Do you care about lost /
>>> >> duplicated data?  Are your writes idempotent?
>>> >>
>>> >> Absent any other information about the problem, I'd stay away from
>>> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
>>> >> feeding postgres.
>>> >>
>>> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar 
>>> wrote:
>>> >> > Is there an advantage to that vs directly consuming from Kafka?
>>> Nothing
>>> >> > is
>>> >> > being done to the data except some light ETL and then storing it in
>>> >> > Cassandra
>>> >> >
>>> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma <
>>> deepakmc...@gmail.com>
>>> >> > wrote:
>>> >> >>
>>> >> >> Its better you use spark's direct stream to ingest from kafka.
>>> >> >>
>>> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
>>> >> >> wrote:
>>> >> >>>
>>> >> >>> I don't think I need a different speed storage and batch storage.
>>> Just
>>> >> >>> taking in raw data from Kafka, standardizing, and storing it
>>> somewhere
>>> >> >>> where
>>> >> >>> the web UI can query it, seems like it will be enough.
>>> >> >>>
>>> >> >>> I'm thinking about:
>>> >> >>>
>>> >> >>> - Reading data from Kafka via Spark Streaming
>>> >> >>> - Standardizing, then storing it in Cassandra
>>> >> >>> - Querying Cassandra from the web ui
>>> >> >>>
>>> >> >>> That seems like it will work. My question now is whether to use
>>> Spark
>>> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
>>> >> >>>
>>> >> >>>
>>> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
>>> >> >>>  wrote:
>>> >> 
>>> >>  - Spark Streaming to read data from Kafka
>>> >>  - Storing the data on HDFS using Flume
>>> >> 
>>> >>  You don't need Spark streaming to read data from Kafka and store
>>> on
>>> >>  HDFS. It is a waste of resources.
>>> >> 
>>> >>  Couple Flume to use Kafka as source and HDFS as sink directly
>>> >> 
>>> >>  KafkaAgent.sources = kafka-sources
>>> >>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
>>> >> 
>>> >>  That will be for your batch layer. To analyse you can directly
>>> read
>>> >>  from
>>> >>  hdfs files with Spark or simply store data in a database of your
>>> >>  choice via
>>> >>  cron or something. Do not mix your batch layer with speed layer.
>>> >> 
>>> >>  Your speed layer will ingest the same data directly from Kafka
>>> into
>>> >>  spark streaming and that will be  online or near real time
>>> (defined
>>> >>  by your
>>> >>  window).
>>> >> 
>>> >>  Then you have a a serving layer to present data 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Deepak Sharma
If you use spark direct streams , it ensure end to end guarantee for
messages.


On Thu, Sep 29, 2016 at 9:05 PM, Ali Akhtar  wrote:

> My concern with Postgres / Cassandra is only scalability. I will look
> further into Postgres horizontal scaling, thanks.
>
> Writes could be idempotent if done as upserts, otherwise updates will be
> idempotent but not inserts.
>
> Data should not be lost. The system should be as fault tolerant as
> possible.
>
> What's the advantage of using Spark for reading Kafka instead of direct
> Kafka consumers?
>
> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger 
> wrote:
>
>> I wouldn't give up the flexibility and maturity of a relational
>> database, unless you have a very specific use case.  I'm not trashing
>> cassandra, I've used cassandra, but if all I know is that you're doing
>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
>> aggregations without a lot of forethought.  If you're worried about
>> scaling, there are several options for horizontally scaling Postgres
>> in particular.  One of the current best from what I've worked with is
>> Citus.
>>
>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma 
>> wrote:
>> > Hi Cody
>> > Spark direct stream is just fine for this use case.
>> > But why postgres and not cassandra?
>> > Is there anything specific here that i may not be aware?
>> >
>> > Thanks
>> > Deepak
>> >
>> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger 
>> wrote:
>> >>
>> >> How are you going to handle etl failures?  Do you care about lost /
>> >> duplicated data?  Are your writes idempotent?
>> >>
>> >> Absent any other information about the problem, I'd stay away from
>> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
>> >> feeding postgres.
>> >>
>> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar 
>> wrote:
>> >> > Is there an advantage to that vs directly consuming from Kafka?
>> Nothing
>> >> > is
>> >> > being done to the data except some light ETL and then storing it in
>> >> > Cassandra
>> >> >
>> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma <
>> deepakmc...@gmail.com>
>> >> > wrote:
>> >> >>
>> >> >> Its better you use spark's direct stream to ingest from kafka.
>> >> >>
>> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
>> >> >> wrote:
>> >> >>>
>> >> >>> I don't think I need a different speed storage and batch storage.
>> Just
>> >> >>> taking in raw data from Kafka, standardizing, and storing it
>> somewhere
>> >> >>> where
>> >> >>> the web UI can query it, seems like it will be enough.
>> >> >>>
>> >> >>> I'm thinking about:
>> >> >>>
>> >> >>> - Reading data from Kafka via Spark Streaming
>> >> >>> - Standardizing, then storing it in Cassandra
>> >> >>> - Querying Cassandra from the web ui
>> >> >>>
>> >> >>> That seems like it will work. My question now is whether to use
>> Spark
>> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
>> >> >>>
>> >> >>>
>> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
>> >> >>>  wrote:
>> >> 
>> >>  - Spark Streaming to read data from Kafka
>> >>  - Storing the data on HDFS using Flume
>> >> 
>> >>  You don't need Spark streaming to read data from Kafka and store
>> on
>> >>  HDFS. It is a waste of resources.
>> >> 
>> >>  Couple Flume to use Kafka as source and HDFS as sink directly
>> >> 
>> >>  KafkaAgent.sources = kafka-sources
>> >>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
>> >> 
>> >>  That will be for your batch layer. To analyse you can directly
>> read
>> >>  from
>> >>  hdfs files with Spark or simply store data in a database of your
>> >>  choice via
>> >>  cron or something. Do not mix your batch layer with speed layer.
>> >> 
>> >>  Your speed layer will ingest the same data directly from Kafka
>> into
>> >>  spark streaming and that will be  online or near real time
>> (defined
>> >>  by your
>> >>  window).
>> >> 
>> >>  Then you have a a serving layer to present data from both speed
>> (the
>> >>  one from SS) and batch layer.
>> >> 
>> >>  HTH
>> >> 
>> >> 
>> >> 
>> >> 
>> >>  Dr Mich Talebzadeh
>> >> 
>> >> 
>> >> 
>> >>  LinkedIn
>> >> 
>> >>  https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJ
>> d6zP6AcPCCdOABUrV8Pw
>> >> 
>> >> 
>> >> 
>> >>  http://talebzadehmich.wordpress.com
>> >> 
>> >> 
>> >>  Disclaimer: Use it at your own risk. Any and all responsibility
>> for
>> >>  any
>> >>  loss, damage or destruction of data or any other property which
>> may
>> >>  arise
>> >>  from relying on this email's technical content is explicitly
>> >>  disclaimed. The
>> >>  author will in no case be liable for any monetary damages arising
>> >>  from such

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Mich Talebzadeh
Yes but still these writes from Spark have  to go through JDBC? Correct.

Having said that I don't see how doing this through Spark streaming to
postgress is going to be faster than source -> Kafka - flume via zookeeper
-> HDFS.

I believe there is direct streaming from Kakfa to Hive as well and from
Flume to Hbase

I would have thought that if one wanted to do real time analytics with SS,
then that would be a good fit with a real time dashboard.

What is not so clear is the business use case for this.

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 16:28, Cody Koeninger  wrote:

> I wouldn't give up the flexibility and maturity of a relational
> database, unless you have a very specific use case.  I'm not trashing
> cassandra, I've used cassandra, but if all I know is that you're doing
> analytics, I wouldn't want to give up the ability to easily do ad-hoc
> aggregations without a lot of forethought.  If you're worried about
> scaling, there are several options for horizontally scaling Postgres
> in particular.  One of the current best from what I've worked with is
> Citus.
>
> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma 
> wrote:
> > Hi Cody
> > Spark direct stream is just fine for this use case.
> > But why postgres and not cassandra?
> > Is there anything specific here that i may not be aware?
> >
> > Thanks
> > Deepak
> >
> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger 
> wrote:
> >>
> >> How are you going to handle etl failures?  Do you care about lost /
> >> duplicated data?  Are your writes idempotent?
> >>
> >> Absent any other information about the problem, I'd stay away from
> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
> >> feeding postgres.
> >>
> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar 
> wrote:
> >> > Is there an advantage to that vs directly consuming from Kafka?
> Nothing
> >> > is
> >> > being done to the data except some light ETL and then storing it in
> >> > Cassandra
> >> >
> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma  >
> >> > wrote:
> >> >>
> >> >> Its better you use spark's direct stream to ingest from kafka.
> >> >>
> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
> >> >> wrote:
> >> >>>
> >> >>> I don't think I need a different speed storage and batch storage.
> Just
> >> >>> taking in raw data from Kafka, standardizing, and storing it
> somewhere
> >> >>> where
> >> >>> the web UI can query it, seems like it will be enough.
> >> >>>
> >> >>> I'm thinking about:
> >> >>>
> >> >>> - Reading data from Kafka via Spark Streaming
> >> >>> - Standardizing, then storing it in Cassandra
> >> >>> - Querying Cassandra from the web ui
> >> >>>
> >> >>> That seems like it will work. My question now is whether to use
> Spark
> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
> >> >>>
> >> >>>
> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
> >> >>>  wrote:
> >> 
> >>  - Spark Streaming to read data from Kafka
> >>  - Storing the data on HDFS using Flume
> >> 
> >>  You don't need Spark streaming to read data from Kafka and store on
> >>  HDFS. It is a waste of resources.
> >> 
> >>  Couple Flume to use Kafka as source and HDFS as sink directly
> >> 
> >>  KafkaAgent.sources = kafka-sources
> >>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
> >> 
> >>  That will be for your batch layer. To analyse you can directly read
> >>  from
> >>  hdfs files with Spark or simply store data in a database of your
> >>  choice via
> >>  cron or something. Do not mix your batch layer with speed layer.
> >> 
> >>  Your speed layer will ingest the same data directly from Kafka into
> >>  spark streaming and that will be  online or near real time (defined
> >>  by your
> >>  window).
> >> 
> >>  Then you have a a serving layer to present data from both speed
> (the
> >>  one from SS) and batch layer.
> >> 
> >>  HTH
> >> 
> >> 
> >> 
> >> 
> >>  Dr Mich Talebzadeh
> >> 
> >> 
> >> 
> >>  LinkedIn
> >> 
> >>  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> 
> >> 
> >> 
> >>  http://talebzadehmich.wordpress.com
> >> 
> >> 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Cody Koeninger
If you're doing any kind of pre-aggregation during ETL, spark direct
stream will let you more easily get the delivery semantics you need,
especially if you're using a transactional data store.

If you're literally just copying individual uniquely keyed items from
kafka to a key-value store, use kafka consumers, sure.

On Thu, Sep 29, 2016 at 10:35 AM, Ali Akhtar  wrote:
> My concern with Postgres / Cassandra is only scalability. I will look
> further into Postgres horizontal scaling, thanks.
>
> Writes could be idempotent if done as upserts, otherwise updates will be
> idempotent but not inserts.
>
> Data should not be lost. The system should be as fault tolerant as possible.
>
> What's the advantage of using Spark for reading Kafka instead of direct
> Kafka consumers?
>
> On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger  wrote:
>>
>> I wouldn't give up the flexibility and maturity of a relational
>> database, unless you have a very specific use case.  I'm not trashing
>> cassandra, I've used cassandra, but if all I know is that you're doing
>> analytics, I wouldn't want to give up the ability to easily do ad-hoc
>> aggregations without a lot of forethought.  If you're worried about
>> scaling, there are several options for horizontally scaling Postgres
>> in particular.  One of the current best from what I've worked with is
>> Citus.
>>
>> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma 
>> wrote:
>> > Hi Cody
>> > Spark direct stream is just fine for this use case.
>> > But why postgres and not cassandra?
>> > Is there anything specific here that i may not be aware?
>> >
>> > Thanks
>> > Deepak
>> >
>> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> How are you going to handle etl failures?  Do you care about lost /
>> >> duplicated data?  Are your writes idempotent?
>> >>
>> >> Absent any other information about the problem, I'd stay away from
>> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
>> >> feeding postgres.
>> >>
>> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar 
>> >> wrote:
>> >> > Is there an advantage to that vs directly consuming from Kafka?
>> >> > Nothing
>> >> > is
>> >> > being done to the data except some light ETL and then storing it in
>> >> > Cassandra
>> >> >
>> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma
>> >> > 
>> >> > wrote:
>> >> >>
>> >> >> Its better you use spark's direct stream to ingest from kafka.
>> >> >>
>> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
>> >> >> wrote:
>> >> >>>
>> >> >>> I don't think I need a different speed storage and batch storage.
>> >> >>> Just
>> >> >>> taking in raw data from Kafka, standardizing, and storing it
>> >> >>> somewhere
>> >> >>> where
>> >> >>> the web UI can query it, seems like it will be enough.
>> >> >>>
>> >> >>> I'm thinking about:
>> >> >>>
>> >> >>> - Reading data from Kafka via Spark Streaming
>> >> >>> - Standardizing, then storing it in Cassandra
>> >> >>> - Querying Cassandra from the web ui
>> >> >>>
>> >> >>> That seems like it will work. My question now is whether to use
>> >> >>> Spark
>> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
>> >> >>>
>> >> >>>
>> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
>> >> >>>  wrote:
>> >> 
>> >>  - Spark Streaming to read data from Kafka
>> >>  - Storing the data on HDFS using Flume
>> >> 
>> >>  You don't need Spark streaming to read data from Kafka and store
>> >>  on
>> >>  HDFS. It is a waste of resources.
>> >> 
>> >>  Couple Flume to use Kafka as source and HDFS as sink directly
>> >> 
>> >>  KafkaAgent.sources = kafka-sources
>> >>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
>> >> 
>> >>  That will be for your batch layer. To analyse you can directly
>> >>  read
>> >>  from
>> >>  hdfs files with Spark or simply store data in a database of your
>> >>  choice via
>> >>  cron or something. Do not mix your batch layer with speed layer.
>> >> 
>> >>  Your speed layer will ingest the same data directly from Kafka
>> >>  into
>> >>  spark streaming and that will be  online or near real time
>> >>  (defined
>> >>  by your
>> >>  window).
>> >> 
>> >>  Then you have a a serving layer to present data from both speed
>> >>  (the
>> >>  one from SS) and batch layer.
>> >> 
>> >>  HTH
>> >> 
>> >> 
>> >> 
>> >> 
>> >>  Dr Mich Talebzadeh
>> >> 
>> >> 
>> >> 
>> >>  LinkedIn
>> >> 
>> >> 
>> >>  https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> >> 
>> >> 
>> >> 
>> >>  http://talebzadehmich.wordpress.com
>> >> 
>> >> 
>> >>  Disclaimer: Use it at your own risk. Any and all responsibility

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
My concern with Postgres / Cassandra is only scalability. I will look
further into Postgres horizontal scaling, thanks.

Writes could be idempotent if done as upserts, otherwise updates will be
idempotent but not inserts.

Data should not be lost. The system should be as fault tolerant as possible.

What's the advantage of using Spark for reading Kafka instead of direct
Kafka consumers?

On Thu, Sep 29, 2016 at 8:28 PM, Cody Koeninger  wrote:

> I wouldn't give up the flexibility and maturity of a relational
> database, unless you have a very specific use case.  I'm not trashing
> cassandra, I've used cassandra, but if all I know is that you're doing
> analytics, I wouldn't want to give up the ability to easily do ad-hoc
> aggregations without a lot of forethought.  If you're worried about
> scaling, there are several options for horizontally scaling Postgres
> in particular.  One of the current best from what I've worked with is
> Citus.
>
> On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma 
> wrote:
> > Hi Cody
> > Spark direct stream is just fine for this use case.
> > But why postgres and not cassandra?
> > Is there anything specific here that i may not be aware?
> >
> > Thanks
> > Deepak
> >
> > On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger 
> wrote:
> >>
> >> How are you going to handle etl failures?  Do you care about lost /
> >> duplicated data?  Are your writes idempotent?
> >>
> >> Absent any other information about the problem, I'd stay away from
> >> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
> >> feeding postgres.
> >>
> >> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar 
> wrote:
> >> > Is there an advantage to that vs directly consuming from Kafka?
> Nothing
> >> > is
> >> > being done to the data except some light ETL and then storing it in
> >> > Cassandra
> >> >
> >> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma  >
> >> > wrote:
> >> >>
> >> >> Its better you use spark's direct stream to ingest from kafka.
> >> >>
> >> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
> >> >> wrote:
> >> >>>
> >> >>> I don't think I need a different speed storage and batch storage.
> Just
> >> >>> taking in raw data from Kafka, standardizing, and storing it
> somewhere
> >> >>> where
> >> >>> the web UI can query it, seems like it will be enough.
> >> >>>
> >> >>> I'm thinking about:
> >> >>>
> >> >>> - Reading data from Kafka via Spark Streaming
> >> >>> - Standardizing, then storing it in Cassandra
> >> >>> - Querying Cassandra from the web ui
> >> >>>
> >> >>> That seems like it will work. My question now is whether to use
> Spark
> >> >>> Streaming to read Kafka, or use Kafka consumers directly.
> >> >>>
> >> >>>
> >> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
> >> >>>  wrote:
> >> 
> >>  - Spark Streaming to read data from Kafka
> >>  - Storing the data on HDFS using Flume
> >> 
> >>  You don't need Spark streaming to read data from Kafka and store on
> >>  HDFS. It is a waste of resources.
> >> 
> >>  Couple Flume to use Kafka as source and HDFS as sink directly
> >> 
> >>  KafkaAgent.sources = kafka-sources
> >>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
> >> 
> >>  That will be for your batch layer. To analyse you can directly read
> >>  from
> >>  hdfs files with Spark or simply store data in a database of your
> >>  choice via
> >>  cron or something. Do not mix your batch layer with speed layer.
> >> 
> >>  Your speed layer will ingest the same data directly from Kafka into
> >>  spark streaming and that will be  online or near real time (defined
> >>  by your
> >>  window).
> >> 
> >>  Then you have a a serving layer to present data from both speed
> (the
> >>  one from SS) and batch layer.
> >> 
> >>  HTH
> >> 
> >> 
> >> 
> >> 
> >>  Dr Mich Talebzadeh
> >> 
> >> 
> >> 
> >>  LinkedIn
> >> 
> >>  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >> 
> >> 
> >> 
> >>  http://talebzadehmich.wordpress.com
> >> 
> >> 
> >>  Disclaimer: Use it at your own risk. Any and all responsibility for
> >>  any
> >>  loss, damage or destruction of data or any other property which may
> >>  arise
> >>  from relying on this email's technical content is explicitly
> >>  disclaimed. The
> >>  author will in no case be liable for any monetary damages arising
> >>  from such
> >>  loss, damage or destruction.
> >> 
> >> 
> >> 
> >> 
> >>  On 29 September 2016 at 15:15, Ali Akhtar 
> >>  wrote:
> >> >
> >> > The web UI is actually the speed layer, it needs to be able to
> query
> >> > the data online, and show the results in 

Re: Treadting NaN fields in Spark

2016-09-29 Thread Mich Talebzadeh
Good points :) it took take "-" as a negative number -123456?

At this moment in time this is what the code does


   1. csv is imported into HDFS as is. No cleaning done for rogue columns
   done at shell level
   2. Spark programs does the following filtration:
   3. val rs = df2.filter($"Open" !== "-").filter($"Volume".cast("Integer")
   > 0)

So my first line of defence is to check for !== "-" which is a dash,
commonly used for not available. The next filter is for volume column > 0
(there was trades on this stock), otherwise the calculation could skew the
results.  Note that a filter with AND with !== will not work.

scala> val rs = df2.filter($"Open" !== "-" && $"Volume".cast("Integer") > 0)
:40: error: value && is not a member of String
   val rs = df2.filter($"Open" !== "-" && $"Volume".cast("Integer") > 0)

Will throw an error.
But this equality === works!

scala> val rs = df2.filter($"Open" *=== "-"* && $"Volume".cast("Integer") >
0)
rs: org.apache.spark.sql.Dataset[columns] = [Stock: string, Ticker: string
... 6 more fields]


Another alternative is to check for all digits here

 scala> def isAllPostiveNumber (price: String) = price forall
Character.isDigit
isAllPostiveNumber: (price: String)Boolean
Retuns Boolean true or false.  But does not work unless someone tells me
what is wrong with this below!

scala> val rs = df2.filter(isAllPostiveNumber("Open") => true)

scala> val rs = df2.filter(isAllPostiveNumber("Open") => true)
:1: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function
parameters.
  Either create a single parameter accepting the Tuple1,
  or consider a pattern matching anonymous function: `{ case (param1,
param1) => ... }
val rs = df2.filter(isAllPostiveNumber("Open") => true)


Thanks












Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 13:45, Michael Segel 
wrote:

> Hi,
>
> Just a few thoughts so take it for what its worth…
>
> Databases have static schemas and will reject a row’s column on insert.
>
> In your case… you have one data set where you have a column which is
> supposed to be a number but you have it as a string.
> You want to convert this to a double in your final data set.
>
>
> It looks like your problem is that your original data set that you
> ingested used a ‘-‘ (dash) to represent missing data, rather than a NULL
> value.
> In fact, looking at the rows… you seem to have a stock that didn’t trade
> for a given day. (All have Volume as 0. ) Why do you need this?  Wouldn’t
> you want to represent this as null or no row for a given date?
>
> The reason your ‘-‘ check failed when isnan() is that ‘-‘ actually could
> be represented as a number.
>
> If you replaced the ‘-‘ with a String that is wider than the width of a
> double … the isnan should flag the row.
>
> (I still need more coffee, so I could be wrong) ;-)
>
> HTH
>
> -Mike
>
> On Sep 28, 2016, at 5:56 AM, Mich Talebzadeh 
> wrote:
>
>
> This is an issue in most databases. Specifically if a field is NaN.. --> (
> *NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in
> floating-point calculations)
>
> There is a method called isnan() in Spark that is supposed to handle this
> scenario . However, it does not return correct values! For example I
> defined column "Open" as String  (it should be Float) and it has the
> following 7 rogue entries out of 1272 rows in a csv
>
> df2.filter( $"OPen" === 
> "-").select((changeToDate("TradeDate").as("TradeDate")),
> 'Open, 'High, 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0|
> |2011-04-21|   -|   -|  -|45.85| 0|
> |2010-12-30|   -|   -|  -|38.10| 0|
> |2010-12-23|   -|   -|  -|38.36| 0|
> |2008-04-30|   -|   -|  -|32.39| 0|
> |2008-04-29|   -|   -|  -|33.05| 0|
> |2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
>
> However, the following does not work!
>
>  df2.filter(isnan($"Open")).show
> +-+--+-+++---+-+--+
> |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
> +-+--+-+++---+-+--+
> +-+--+-+++---+-+--+
>
> Any suggestions?
>
> Thanks
>

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Cody Koeninger
I wouldn't give up the flexibility and maturity of a relational
database, unless you have a very specific use case.  I'm not trashing
cassandra, I've used cassandra, but if all I know is that you're doing
analytics, I wouldn't want to give up the ability to easily do ad-hoc
aggregations without a lot of forethought.  If you're worried about
scaling, there are several options for horizontally scaling Postgres
in particular.  One of the current best from what I've worked with is
Citus.

On Thu, Sep 29, 2016 at 10:15 AM, Deepak Sharma  wrote:
> Hi Cody
> Spark direct stream is just fine for this use case.
> But why postgres and not cassandra?
> Is there anything specific here that i may not be aware?
>
> Thanks
> Deepak
>
> On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger  wrote:
>>
>> How are you going to handle etl failures?  Do you care about lost /
>> duplicated data?  Are your writes idempotent?
>>
>> Absent any other information about the problem, I'd stay away from
>> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
>> feeding postgres.
>>
>> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar  wrote:
>> > Is there an advantage to that vs directly consuming from Kafka? Nothing
>> > is
>> > being done to the data except some light ETL and then storing it in
>> > Cassandra
>> >
>> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma 
>> > wrote:
>> >>
>> >> Its better you use spark's direct stream to ingest from kafka.
>> >>
>> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
>> >> wrote:
>> >>>
>> >>> I don't think I need a different speed storage and batch storage. Just
>> >>> taking in raw data from Kafka, standardizing, and storing it somewhere
>> >>> where
>> >>> the web UI can query it, seems like it will be enough.
>> >>>
>> >>> I'm thinking about:
>> >>>
>> >>> - Reading data from Kafka via Spark Streaming
>> >>> - Standardizing, then storing it in Cassandra
>> >>> - Querying Cassandra from the web ui
>> >>>
>> >>> That seems like it will work. My question now is whether to use Spark
>> >>> Streaming to read Kafka, or use Kafka consumers directly.
>> >>>
>> >>>
>> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
>> >>>  wrote:
>> 
>>  - Spark Streaming to read data from Kafka
>>  - Storing the data on HDFS using Flume
>> 
>>  You don't need Spark streaming to read data from Kafka and store on
>>  HDFS. It is a waste of resources.
>> 
>>  Couple Flume to use Kafka as source and HDFS as sink directly
>> 
>>  KafkaAgent.sources = kafka-sources
>>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
>> 
>>  That will be for your batch layer. To analyse you can directly read
>>  from
>>  hdfs files with Spark or simply store data in a database of your
>>  choice via
>>  cron or something. Do not mix your batch layer with speed layer.
>> 
>>  Your speed layer will ingest the same data directly from Kafka into
>>  spark streaming and that will be  online or near real time (defined
>>  by your
>>  window).
>> 
>>  Then you have a a serving layer to present data from both speed  (the
>>  one from SS) and batch layer.
>> 
>>  HTH
>> 
>> 
>> 
>> 
>>  Dr Mich Talebzadeh
>> 
>> 
>> 
>>  LinkedIn
>> 
>>  https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> 
>> 
>> 
>>  http://talebzadehmich.wordpress.com
>> 
>> 
>>  Disclaimer: Use it at your own risk. Any and all responsibility for
>>  any
>>  loss, damage or destruction of data or any other property which may
>>  arise
>>  from relying on this email's technical content is explicitly
>>  disclaimed. The
>>  author will in no case be liable for any monetary damages arising
>>  from such
>>  loss, damage or destruction.
>> 
>> 
>> 
>> 
>>  On 29 September 2016 at 15:15, Ali Akhtar 
>>  wrote:
>> >
>> > The web UI is actually the speed layer, it needs to be able to query
>> > the data online, and show the results in real-time.
>> >
>> > It also needs a custom front-end, so a system like Tableau can't be
>> > used, it must have a custom backend + front-end.
>> >
>> > Thanks for the recommendation of Flume. Do you think this will work:
>> >
>> > - Spark Streaming to read data from Kafka
>> > - Storing the data on HDFS using Flume
>> > - Using Spark to query the data in the backend of the web UI?
>> >
>> >
>> >
>> > On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh
>> >  wrote:
>> >>
>> >> You need a batch layer and a speed layer. Data from Kafka can be
>> >> stored on HDFS using flume.
>> >>
>> >> -  Query this data to generate reports 

Re: Treadting NaN fields in Spark

2016-09-29 Thread Peter Figliozzi
"isnan" ends up using a case class, subclass of UnaryExpression, called
"IsNaN" which evaluates each row of the column like this:

   - *False* if the value is Null
   - Check the "Expression.Type" (apparently a Spark thing, not a Scala
   thing.. still learning here)
   - DoubleType:  cast to Double and retrieve .isNaN
   - FloatType: cast to Float and retrieve .isNaN
   - Casting done by value.asInstanceOf[T]

What's interesting is the "inputTypes" for this class are only DoubleType
and FloatType.  Unfortunately, I haven't figured out how the code would
handle a String.  Maybe someone could tell us how these Expressions work?

In any case, we're not getting *True* back unless the value x casted to a
Double actually returns Double.NaN.  Strings casted to Double return errors
(not Double.NaN) and the '-' character casted to Double returns 45 (!).

On Thu, Sep 29, 2016 at 7:45 AM, Michael Segel 
wrote:

> Hi,
>
> Just a few thoughts so take it for what its worth…
>
> Databases have static schemas and will reject a row’s column on insert.
>
> In your case… you have one data set where you have a column which is
> supposed to be a number but you have it as a string.
> You want to convert this to a double in your final data set.
>
>
> It looks like your problem is that your original data set that you
> ingested used a ‘-‘ (dash) to represent missing data, rather than a NULL
> value.
> In fact, looking at the rows… you seem to have a stock that didn’t trade
> for a given day. (All have Volume as 0. ) Why do you need this?  Wouldn’t
> you want to represent this as null or no row for a given date?
>
> The reason your ‘-‘ check failed when isnan() is that ‘-‘ actually could
> be represented as a number.
>
> If you replaced the ‘-‘ with a String that is wider than the width of a
> double … the isnan should flag the row.
>
> (I still need more coffee, so I could be wrong) ;-)
>
> HTH
>
> -Mike
>
> On Sep 28, 2016, at 5:56 AM, Mich Talebzadeh 
> wrote:
>
>
> This is an issue in most databases. Specifically if a field is NaN.. --> (
> *NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in
> floating-point calculations)
>
> There is a method called isnan() in Spark that is supposed to handle this
> scenario . However, it does not return correct values! For example I
> defined column "Open" as String  (it should be Float) and it has the
> following 7 rogue entries out of 1272 rows in a csv
>
> df2.filter( $"OPen" === 
> "-").select((changeToDate("TradeDate").as("TradeDate")),
> 'Open, 'High, 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0|
> |2011-04-21|   -|   -|  -|45.85| 0|
> |2010-12-30|   -|   -|  -|38.10| 0|
> |2010-12-23|   -|   -|  -|38.36| 0|
> |2008-04-30|   -|   -|  -|32.39| 0|
> |2008-04-29|   -|   -|  -|33.05| 0|
> |2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
>
> However, the following does not work!
>
>  df2.filter(isnan($"Open")).show
> +-+--+-+++---+-+--+
> |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
> +-+--+-+++---+-+--+
> +-+--+-+++---+-+--+
>
> Any suggestions?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Deepak Sharma
Hi Cody
Spark direct stream is just fine for this use case.
But why postgres and not cassandra?
Is there anything specific here that i may not be aware?

Thanks
Deepak

On Thu, Sep 29, 2016 at 8:41 PM, Cody Koeninger  wrote:

> How are you going to handle etl failures?  Do you care about lost /
> duplicated data?  Are your writes idempotent?
>
> Absent any other information about the problem, I'd stay away from
> cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
> feeding postgres.
>
> On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar  wrote:
> > Is there an advantage to that vs directly consuming from Kafka? Nothing
> is
> > being done to the data except some light ETL and then storing it in
> > Cassandra
> >
> > On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma 
> > wrote:
> >>
> >> Its better you use spark's direct stream to ingest from kafka.
> >>
> >> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar 
> wrote:
> >>>
> >>> I don't think I need a different speed storage and batch storage. Just
> >>> taking in raw data from Kafka, standardizing, and storing it somewhere
> where
> >>> the web UI can query it, seems like it will be enough.
> >>>
> >>> I'm thinking about:
> >>>
> >>> - Reading data from Kafka via Spark Streaming
> >>> - Standardizing, then storing it in Cassandra
> >>> - Querying Cassandra from the web ui
> >>>
> >>> That seems like it will work. My question now is whether to use Spark
> >>> Streaming to read Kafka, or use Kafka consumers directly.
> >>>
> >>>
> >>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
> >>>  wrote:
> 
>  - Spark Streaming to read data from Kafka
>  - Storing the data on HDFS using Flume
> 
>  You don't need Spark streaming to read data from Kafka and store on
>  HDFS. It is a waste of resources.
> 
>  Couple Flume to use Kafka as source and HDFS as sink directly
> 
>  KafkaAgent.sources = kafka-sources
>  KafkaAgent.sinks.hdfs-sinks.type = hdfs
> 
>  That will be for your batch layer. To analyse you can directly read
> from
>  hdfs files with Spark or simply store data in a database of your
> choice via
>  cron or something. Do not mix your batch layer with speed layer.
> 
>  Your speed layer will ingest the same data directly from Kafka into
>  spark streaming and that will be  online or near real time (defined
> by your
>  window).
> 
>  Then you have a a serving layer to present data from both speed  (the
>  one from SS) and batch layer.
> 
>  HTH
> 
> 
> 
> 
>  Dr Mich Talebzadeh
> 
> 
> 
>  LinkedIn
>  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> 
> 
> 
>  http://talebzadehmich.wordpress.com
> 
> 
>  Disclaimer: Use it at your own risk. Any and all responsibility for
> any
>  loss, damage or destruction of data or any other property which may
> arise
>  from relying on this email's technical content is explicitly
> disclaimed. The
>  author will in no case be liable for any monetary damages arising
> from such
>  loss, damage or destruction.
> 
> 
> 
> 
>  On 29 September 2016 at 15:15, Ali Akhtar 
> wrote:
> >
> > The web UI is actually the speed layer, it needs to be able to query
> > the data online, and show the results in real-time.
> >
> > It also needs a custom front-end, so a system like Tableau can't be
> > used, it must have a custom backend + front-end.
> >
> > Thanks for the recommendation of Flume. Do you think this will work:
> >
> > - Spark Streaming to read data from Kafka
> > - Storing the data on HDFS using Flume
> > - Using Spark to query the data in the backend of the web UI?
> >
> >
> >
> > On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh
> >  wrote:
> >>
> >> You need a batch layer and a speed layer. Data from Kafka can be
> >> stored on HDFS using flume.
> >>
> >> -  Query this data to generate reports / analytics (There will be a
> >> web UI which will be the front-end to the data, and will show the
> reports)
> >>
> >> This is basically batch layer and you need something like Tableau or
> >> Zeppelin to query data
> >>
> >> You will also need spark streaming to query data online for speed
> >> layer. That data could be stored in some transient fabric like
> ignite or
> >> even druid.
> >>
> >> HTH
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn
> >> https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Cody Koeninger
How are you going to handle etl failures?  Do you care about lost /
duplicated data?  Are your writes idempotent?

Absent any other information about the problem, I'd stay away from
cassandra/flume/hdfs/hbase/whatever, and use a spark direct stream
feeding postgres.

On Thu, Sep 29, 2016 at 10:04 AM, Ali Akhtar  wrote:
> Is there an advantage to that vs directly consuming from Kafka? Nothing is
> being done to the data except some light ETL and then storing it in
> Cassandra
>
> On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma 
> wrote:
>>
>> Its better you use spark's direct stream to ingest from kafka.
>>
>> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar  wrote:
>>>
>>> I don't think I need a different speed storage and batch storage. Just
>>> taking in raw data from Kafka, standardizing, and storing it somewhere where
>>> the web UI can query it, seems like it will be enough.
>>>
>>> I'm thinking about:
>>>
>>> - Reading data from Kafka via Spark Streaming
>>> - Standardizing, then storing it in Cassandra
>>> - Querying Cassandra from the web ui
>>>
>>> That seems like it will work. My question now is whether to use Spark
>>> Streaming to read Kafka, or use Kafka consumers directly.
>>>
>>>
>>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh
>>>  wrote:

 - Spark Streaming to read data from Kafka
 - Storing the data on HDFS using Flume

 You don't need Spark streaming to read data from Kafka and store on
 HDFS. It is a waste of resources.

 Couple Flume to use Kafka as source and HDFS as sink directly

 KafkaAgent.sources = kafka-sources
 KafkaAgent.sinks.hdfs-sinks.type = hdfs

 That will be for your batch layer. To analyse you can directly read from
 hdfs files with Spark or simply store data in a database of your choice via
 cron or something. Do not mix your batch layer with speed layer.

 Your speed layer will ingest the same data directly from Kafka into
 spark streaming and that will be  online or near real time (defined by your
 window).

 Then you have a a serving layer to present data from both speed  (the
 one from SS) and batch layer.

 HTH




 Dr Mich Talebzadeh



 LinkedIn
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



 http://talebzadehmich.wordpress.com


 Disclaimer: Use it at your own risk. Any and all responsibility for any
 loss, damage or destruction of data or any other property which may arise
 from relying on this email's technical content is explicitly disclaimed. 
 The
 author will in no case be liable for any monetary damages arising from such
 loss, damage or destruction.




 On 29 September 2016 at 15:15, Ali Akhtar  wrote:
>
> The web UI is actually the speed layer, it needs to be able to query
> the data online, and show the results in real-time.
>
> It also needs a custom front-end, so a system like Tableau can't be
> used, it must have a custom backend + front-end.
>
> Thanks for the recommendation of Flume. Do you think this will work:
>
> - Spark Streaming to read data from Kafka
> - Storing the data on HDFS using Flume
> - Using Spark to query the data in the backend of the web UI?
>
>
>
> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh
>  wrote:
>>
>> You need a batch layer and a speed layer. Data from Kafka can be
>> stored on HDFS using flume.
>>
>> -  Query this data to generate reports / analytics (There will be a
>> web UI which will be the front-end to the data, and will show the 
>> reports)
>>
>> This is basically batch layer and you need something like Tableau or
>> Zeppelin to query data
>>
>> You will also need spark streaming to query data online for speed
>> layer. That data could be stored in some transient fabric like ignite or
>> even druid.
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On 29 September 2016 at 15:01, Ali Akhtar 
>> wrote:
>>>
>>> It needs to be able to scale 

Re: udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
btw, i am using spark 1.6.1



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811p27812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
Is there an advantage to that vs directly consuming from Kafka? Nothing is
being done to the data except some light ETL and then storing it in
Cassandra

On Thu, Sep 29, 2016 at 7:58 PM, Deepak Sharma 
wrote:

> Its better you use spark's direct stream to ingest from kafka.
>
> On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar  wrote:
>
>> I don't think I need a different speed storage and batch storage. Just
>> taking in raw data from Kafka, standardizing, and storing it somewhere
>> where the web UI can query it, seems like it will be enough.
>>
>> I'm thinking about:
>>
>> - Reading data from Kafka via Spark Streaming
>> - Standardizing, then storing it in Cassandra
>> - Querying Cassandra from the web ui
>>
>> That seems like it will work. My question now is whether to use Spark
>> Streaming to read Kafka, or use Kafka consumers directly.
>>
>>
>> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> - Spark Streaming to read data from Kafka
>>> - Storing the data on HDFS using Flume
>>>
>>> You don't need Spark streaming to read data from Kafka and store on
>>> HDFS. It is a waste of resources.
>>>
>>> Couple Flume to use Kafka as source and HDFS as sink directly
>>>
>>> KafkaAgent.sources = kafka-sources
>>> KafkaAgent.sinks.hdfs-sinks.type = hdfs
>>>
>>> That will be for your batch layer. To analyse you can directly read from
>>> hdfs files with Spark or simply store data in a database of your choice via
>>> cron or something. Do not mix your batch layer with speed layer.
>>>
>>> Your speed layer will ingest the same data directly from Kafka into
>>> spark streaming and that will be  online or near real time (defined by your
>>> window).
>>>
>>> Then you have a a serving layer to present data from both speed  (the
>>> one from SS) and batch layer.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 29 September 2016 at 15:15, Ali Akhtar  wrote:
>>>
 The web UI is actually the speed layer, it needs to be able to query
 the data online, and show the results in real-time.

 It also needs a custom front-end, so a system like Tableau can't be
 used, it must have a custom backend + front-end.

 Thanks for the recommendation of Flume. Do you think this will work:

 - Spark Streaming to read data from Kafka
 - Storing the data on HDFS using Flume
 - Using Spark to query the data in the backend of the web UI?



 On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> You need a batch layer and a speed layer. Data from Kafka can be
> stored on HDFS using flume.
>
> -  Query this data to generate reports / analytics (There will be a
> web UI which will be the front-end to the data, and will show the reports)
>
> This is basically batch layer and you need something like Tableau or
> Zeppelin to query data
>
> You will also need spark streaming to query data online for speed
> layer. That data could be stored in some transient fabric like ignite or
> even druid.
>
> HTH
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which may
> arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary damages
> arising from such loss, damage or destruction.
>
>
>
> On 29 September 2016 at 15:01, Ali Akhtar 
> wrote:
>
>> It needs to be able to scale to a very large amount of data, yes.
>>
>> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma > > wrote:
>>
>>> What is the message inflow ?
>>> If it's really high , definitely spark will be of great use .
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Sep 29, 2016 19:24, 

udf of aggregation in pyspark dataframe ?

2016-09-29 Thread peng yu
Hi, 

is there a way to write a udf in pyspark support agg()? 


i search all over the docs and internet, and tested it out.. some say yes,
some say no.

and when i try those yes code examples, just complaint about

AnalysisException: u"expression 'pythonUDF' is neither present in the group
by, nor is it an aggregate function. Add to group by or wrap in first() (or
first_value) if you don't care which value you get.;"



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/udf-of-aggregation-in-pyspark-dataframe-tp27811.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Deepak Sharma
Its better you use spark's direct stream to ingest from kafka.

On Thu, Sep 29, 2016 at 8:24 PM, Ali Akhtar  wrote:

> I don't think I need a different speed storage and batch storage. Just
> taking in raw data from Kafka, standardizing, and storing it somewhere
> where the web UI can query it, seems like it will be enough.
>
> I'm thinking about:
>
> - Reading data from Kafka via Spark Streaming
> - Standardizing, then storing it in Cassandra
> - Querying Cassandra from the web ui
>
> That seems like it will work. My question now is whether to use Spark
> Streaming to read Kafka, or use Kafka consumers directly.
>
>
> On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> - Spark Streaming to read data from Kafka
>> - Storing the data on HDFS using Flume
>>
>> You don't need Spark streaming to read data from Kafka and store on HDFS.
>> It is a waste of resources.
>>
>> Couple Flume to use Kafka as source and HDFS as sink directly
>>
>> KafkaAgent.sources = kafka-sources
>> KafkaAgent.sinks.hdfs-sinks.type = hdfs
>>
>> That will be for your batch layer. To analyse you can directly read from
>> hdfs files with Spark or simply store data in a database of your choice via
>> cron or something. Do not mix your batch layer with speed layer.
>>
>> Your speed layer will ingest the same data directly from Kafka into spark
>> streaming and that will be  online or near real time (defined by your
>> window).
>>
>> Then you have a a serving layer to present data from both speed  (the one
>> from SS) and batch layer.
>>
>> HTH
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 29 September 2016 at 15:15, Ali Akhtar  wrote:
>>
>>> The web UI is actually the speed layer, it needs to be able to query the
>>> data online, and show the results in real-time.
>>>
>>> It also needs a custom front-end, so a system like Tableau can't be
>>> used, it must have a custom backend + front-end.
>>>
>>> Thanks for the recommendation of Flume. Do you think this will work:
>>>
>>> - Spark Streaming to read data from Kafka
>>> - Storing the data on HDFS using Flume
>>> - Using Spark to query the data in the backend of the web UI?
>>>
>>>
>>>
>>> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 You need a batch layer and a speed layer. Data from Kafka can be stored
 on HDFS using flume.

 -  Query this data to generate reports / analytics (There will be a web
 UI which will be the front-end to the data, and will show the reports)

 This is basically batch layer and you need something like Tableau or
 Zeppelin to query data

 You will also need spark streaming to query data online for speed
 layer. That data could be stored in some transient fabric like ignite or
 even druid.

 HTH








 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 29 September 2016 at 15:01, Ali Akhtar  wrote:

> It needs to be able to scale to a very large amount of data, yes.
>
> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
> wrote:
>
>> What is the message inflow ?
>> If it's really high , definitely spark will be of great use .
>>
>> Thanks
>> Deepak
>>
>> On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:
>>
>>> I have a somewhat tricky use case, and I'm looking for ideas.
>>>
>>> I have 5-6 Kafka producers, reading various APIs, and writing their
>>> raw data into Kafka.
>>>
>>> I need to:
>>>
>>> - Do ETL on the data, and standardize it.
>>>
>>> - Store the standardized data somewhere (HBase / 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
I don't think I need a different speed storage and batch storage. Just
taking in raw data from Kafka, standardizing, and storing it somewhere
where the web UI can query it, seems like it will be enough.

I'm thinking about:

- Reading data from Kafka via Spark Streaming
- Standardizing, then storing it in Cassandra
- Querying Cassandra from the web ui

That seems like it will work. My question now is whether to use Spark
Streaming to read Kafka, or use Kafka consumers directly.


On Thu, Sep 29, 2016 at 7:41 PM, Mich Talebzadeh 
wrote:

> - Spark Streaming to read data from Kafka
> - Storing the data on HDFS using Flume
>
> You don't need Spark streaming to read data from Kafka and store on HDFS.
> It is a waste of resources.
>
> Couple Flume to use Kafka as source and HDFS as sink directly
>
> KafkaAgent.sources = kafka-sources
> KafkaAgent.sinks.hdfs-sinks.type = hdfs
>
> That will be for your batch layer. To analyse you can directly read from
> hdfs files with Spark or simply store data in a database of your choice via
> cron or something. Do not mix your batch layer with speed layer.
>
> Your speed layer will ingest the same data directly from Kafka into spark
> streaming and that will be  online or near real time (defined by your
> window).
>
> Then you have a a serving layer to present data from both speed  (the one
> from SS) and batch layer.
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 September 2016 at 15:15, Ali Akhtar  wrote:
>
>> The web UI is actually the speed layer, it needs to be able to query the
>> data online, and show the results in real-time.
>>
>> It also needs a custom front-end, so a system like Tableau can't be used,
>> it must have a custom backend + front-end.
>>
>> Thanks for the recommendation of Flume. Do you think this will work:
>>
>> - Spark Streaming to read data from Kafka
>> - Storing the data on HDFS using Flume
>> - Using Spark to query the data in the backend of the web UI?
>>
>>
>>
>> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> You need a batch layer and a speed layer. Data from Kafka can be stored
>>> on HDFS using flume.
>>>
>>> -  Query this data to generate reports / analytics (There will be a web
>>> UI which will be the front-end to the data, and will show the reports)
>>>
>>> This is basically batch layer and you need something like Tableau or
>>> Zeppelin to query data
>>>
>>> You will also need spark streaming to query data online for speed layer.
>>> That data could be stored in some transient fabric like ignite or even
>>> druid.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 29 September 2016 at 15:01, Ali Akhtar  wrote:
>>>
 It needs to be able to scale to a very large amount of data, yes.

 On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
 wrote:

> What is the message inflow ?
> If it's really high , definitely spark will be of great use .
>
> Thanks
> Deepak
>
> On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:
>
>> I have a somewhat tricky use case, and I'm looking for ideas.
>>
>> I have 5-6 Kafka producers, reading various APIs, and writing their
>> raw data into Kafka.
>>
>> I need to:
>>
>> - Do ETL on the data, and standardize it.
>>
>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS
>> / ElasticSearch / Postgres)
>>
>> - Query this data to generate reports / analytics (There will be a
>> web UI which will be the front-end to the data, and will show the 
>> reports)
>>
>> Java is being used as the backend language for everything (backend 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Deepak Sharma
Since the inflow is huge , flume would also need to be run with multiple
channels in distributed fashion.
In that case , the resource utilization will be high in that case as well.

Thanks
Deepak

On Thu, Sep 29, 2016 at 8:11 PM, Mich Talebzadeh 
wrote:

> - Spark Streaming to read data from Kafka
> - Storing the data on HDFS using Flume
>
> You don't need Spark streaming to read data from Kafka and store on HDFS.
> It is a waste of resources.
>
> Couple Flume to use Kafka as source and HDFS as sink directly
>
> KafkaAgent.sources = kafka-sources
> KafkaAgent.sinks.hdfs-sinks.type = hdfs
>
> That will be for your batch layer. To analyse you can directly read from
> hdfs files with Spark or simply store data in a database of your choice via
> cron or something. Do not mix your batch layer with speed layer.
>
> Your speed layer will ingest the same data directly from Kafka into spark
> streaming and that will be  online or near real time (defined by your
> window).
>
> Then you have a a serving layer to present data from both speed  (the one
> from SS) and batch layer.
>
> HTH
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 September 2016 at 15:15, Ali Akhtar  wrote:
>
>> The web UI is actually the speed layer, it needs to be able to query the
>> data online, and show the results in real-time.
>>
>> It also needs a custom front-end, so a system like Tableau can't be used,
>> it must have a custom backend + front-end.
>>
>> Thanks for the recommendation of Flume. Do you think this will work:
>>
>> - Spark Streaming to read data from Kafka
>> - Storing the data on HDFS using Flume
>> - Using Spark to query the data in the backend of the web UI?
>>
>>
>>
>> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> You need a batch layer and a speed layer. Data from Kafka can be stored
>>> on HDFS using flume.
>>>
>>> -  Query this data to generate reports / analytics (There will be a web
>>> UI which will be the front-end to the data, and will show the reports)
>>>
>>> This is basically batch layer and you need something like Tableau or
>>> Zeppelin to query data
>>>
>>> You will also need spark streaming to query data online for speed layer.
>>> That data could be stored in some transient fabric like ignite or even
>>> druid.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 29 September 2016 at 15:01, Ali Akhtar  wrote:
>>>
 It needs to be able to scale to a very large amount of data, yes.

 On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
 wrote:

> What is the message inflow ?
> If it's really high , definitely spark will be of great use .
>
> Thanks
> Deepak
>
> On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:
>
>> I have a somewhat tricky use case, and I'm looking for ideas.
>>
>> I have 5-6 Kafka producers, reading various APIs, and writing their
>> raw data into Kafka.
>>
>> I need to:
>>
>> - Do ETL on the data, and standardize it.
>>
>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS
>> / ElasticSearch / Postgres)
>>
>> - Query this data to generate reports / analytics (There will be a
>> web UI which will be the front-end to the data, and will show the 
>> reports)
>>
>> Java is being used as the backend language for everything (backend of
>> the web UI, as well as the ETL layer)
>>
>> I'm considering:
>>
>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
>> (receive raw data from Kafka, standardize & store it)
>>
>> - Using Cassandra, HBase, or raw HDFS, for storing the 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Mich Talebzadeh
- Spark Streaming to read data from Kafka
- Storing the data on HDFS using Flume

You don't need Spark streaming to read data from Kafka and store on HDFS.
It is a waste of resources.

Couple Flume to use Kafka as source and HDFS as sink directly

KafkaAgent.sources = kafka-sources
KafkaAgent.sinks.hdfs-sinks.type = hdfs

That will be for your batch layer. To analyse you can directly read from
hdfs files with Spark or simply store data in a database of your choice via
cron or something. Do not mix your batch layer with speed layer.

Your speed layer will ingest the same data directly from Kafka into spark
streaming and that will be  online or near real time (defined by your
window).

Then you have a a serving layer to present data from both speed  (the one
from SS) and batch layer.

HTH




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 15:15, Ali Akhtar  wrote:

> The web UI is actually the speed layer, it needs to be able to query the
> data online, and show the results in real-time.
>
> It also needs a custom front-end, so a system like Tableau can't be used,
> it must have a custom backend + front-end.
>
> Thanks for the recommendation of Flume. Do you think this will work:
>
> - Spark Streaming to read data from Kafka
> - Storing the data on HDFS using Flume
> - Using Spark to query the data in the backend of the web UI?
>
>
>
> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> You need a batch layer and a speed layer. Data from Kafka can be stored
>> on HDFS using flume.
>>
>> -  Query this data to generate reports / analytics (There will be a web
>> UI which will be the front-end to the data, and will show the reports)
>>
>> This is basically batch layer and you need something like Tableau or
>> Zeppelin to query data
>>
>> You will also need spark streaming to query data online for speed layer.
>> That data could be stored in some transient fabric like ignite or even
>> druid.
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 29 September 2016 at 15:01, Ali Akhtar  wrote:
>>
>>> It needs to be able to scale to a very large amount of data, yes.
>>>
>>> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
>>> wrote:
>>>
 What is the message inflow ?
 If it's really high , definitely spark will be of great use .

 Thanks
 Deepak

 On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:

> I have a somewhat tricky use case, and I'm looking for ideas.
>
> I have 5-6 Kafka producers, reading various APIs, and writing their
> raw data into Kafka.
>
> I need to:
>
> - Do ETL on the data, and standardize it.
>
> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS
> / ElasticSearch / Postgres)
>
> - Query this data to generate reports / analytics (There will be a web
> UI which will be the front-end to the data, and will show the reports)
>
> Java is being used as the backend language for everything (backend of
> the web UI, as well as the ETL layer)
>
> I'm considering:
>
> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
> (receive raw data from Kafka, standardize & store it)
>
> - Using Cassandra, HBase, or raw HDFS, for storing the standardized
> data, and to allow queries
>
> - In the backend of the web UI, I could either use Spark to run
> queries across the data (mostly filters), or directly run queries against
> Cassandra / HBase
>
> I'd appreciate some thoughts / suggestions on which of these
> alternatives I should go with (e.g, using raw Kafka consumers vs Spark for
> ETL, which persistent data store to use, and how to query that data store
> 

Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Deepak Sharma
For ui , you need DB such as Cassandra that is designed to work around
queries .
Ingest the data to spark streaming (speed layer) and write to hdfs(for
batch layer).
Now you have data at rest as well as in motion(real time).
>From spark streaming itself , do further processing and write the final
result to Cassandra/nosql DB.
UI can pick the data from the DB now.

Thanks
Deepak

On Thu, Sep 29, 2016 at 8:00 PM, Alonso Isidoro Roman 
wrote:

> "Using Spark to query the data in the backend of the web UI?"
>
> Dont do that. I would recommend that spark streaming process stores data
> into some nosql or sql database and the web ui to query data from that
> database.
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> 
>
> 2016-09-29 16:15 GMT+02:00 Ali Akhtar :
>
>> The web UI is actually the speed layer, it needs to be able to query the
>> data online, and show the results in real-time.
>>
>> It also needs a custom front-end, so a system like Tableau can't be used,
>> it must have a custom backend + front-end.
>>
>> Thanks for the recommendation of Flume. Do you think this will work:
>>
>> - Spark Streaming to read data from Kafka
>> - Storing the data on HDFS using Flume
>> - Using Spark to query the data in the backend of the web UI?
>>
>>
>>
>> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> You need a batch layer and a speed layer. Data from Kafka can be stored
>>> on HDFS using flume.
>>>
>>> -  Query this data to generate reports / analytics (There will be a web
>>> UI which will be the front-end to the data, and will show the reports)
>>>
>>> This is basically batch layer and you need something like Tableau or
>>> Zeppelin to query data
>>>
>>> You will also need spark streaming to query data online for speed layer.
>>> That data could be stored in some transient fabric like ignite or even
>>> druid.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 29 September 2016 at 15:01, Ali Akhtar  wrote:
>>>
 It needs to be able to scale to a very large amount of data, yes.

 On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
 wrote:

> What is the message inflow ?
> If it's really high , definitely spark will be of great use .
>
> Thanks
> Deepak
>
> On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:
>
>> I have a somewhat tricky use case, and I'm looking for ideas.
>>
>> I have 5-6 Kafka producers, reading various APIs, and writing their
>> raw data into Kafka.
>>
>> I need to:
>>
>> - Do ETL on the data, and standardize it.
>>
>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS
>> / ElasticSearch / Postgres)
>>
>> - Query this data to generate reports / analytics (There will be a
>> web UI which will be the front-end to the data, and will show the 
>> reports)
>>
>> Java is being used as the backend language for everything (backend of
>> the web UI, as well as the ETL layer)
>>
>> I'm considering:
>>
>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
>> (receive raw data from Kafka, standardize & store it)
>>
>> - Using Cassandra, HBase, or raw HDFS, for storing the standardized
>> data, and to allow queries
>>
>> - In the backend of the web UI, I could either use Spark to run
>> queries across the data (mostly filters), or directly run queries against
>> Cassandra / HBase
>>
>> I'd appreciate some thoughts / suggestions on which of these
>> alternatives I should go with (e.g, using raw Kafka consumers vs Spark 
>> for
>> ETL, which persistent data store to use, and how to query that data store
>> in the backend of the web UI, for displaying the reports).
>>
>>
>> Thanks.
>>
>

>>>
>>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Alonso Isidoro Roman
"Using Spark to query the data in the backend of the web UI?"

Dont do that. I would recommend that spark streaming process stores data
into some nosql or sql database and the web ui to query data from that
database.

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2016-09-29 16:15 GMT+02:00 Ali Akhtar :

> The web UI is actually the speed layer, it needs to be able to query the
> data online, and show the results in real-time.
>
> It also needs a custom front-end, so a system like Tableau can't be used,
> it must have a custom backend + front-end.
>
> Thanks for the recommendation of Flume. Do you think this will work:
>
> - Spark Streaming to read data from Kafka
> - Storing the data on HDFS using Flume
> - Using Spark to query the data in the backend of the web UI?
>
>
>
> On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> You need a batch layer and a speed layer. Data from Kafka can be stored
>> on HDFS using flume.
>>
>> -  Query this data to generate reports / analytics (There will be a web
>> UI which will be the front-end to the data, and will show the reports)
>>
>> This is basically batch layer and you need something like Tableau or
>> Zeppelin to query data
>>
>> You will also need spark streaming to query data online for speed layer.
>> That data could be stored in some transient fabric like ignite or even
>> druid.
>>
>> HTH
>>
>>
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 29 September 2016 at 15:01, Ali Akhtar  wrote:
>>
>>> It needs to be able to scale to a very large amount of data, yes.
>>>
>>> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
>>> wrote:
>>>
 What is the message inflow ?
 If it's really high , definitely spark will be of great use .

 Thanks
 Deepak

 On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:

> I have a somewhat tricky use case, and I'm looking for ideas.
>
> I have 5-6 Kafka producers, reading various APIs, and writing their
> raw data into Kafka.
>
> I need to:
>
> - Do ETL on the data, and standardize it.
>
> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS
> / ElasticSearch / Postgres)
>
> - Query this data to generate reports / analytics (There will be a web
> UI which will be the front-end to the data, and will show the reports)
>
> Java is being used as the backend language for everything (backend of
> the web UI, as well as the ETL layer)
>
> I'm considering:
>
> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
> (receive raw data from Kafka, standardize & store it)
>
> - Using Cassandra, HBase, or raw HDFS, for storing the standardized
> data, and to allow queries
>
> - In the backend of the web UI, I could either use Spark to run
> queries across the data (mostly filters), or directly run queries against
> Cassandra / HBase
>
> I'd appreciate some thoughts / suggestions on which of these
> alternatives I should go with (e.g, using raw Kafka consumers vs Spark for
> ETL, which persistent data store to use, and how to query that data store
> in the backend of the web UI, for displaying the reports).
>
>
> Thanks.
>

>>>
>>
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
The web UI is actually the speed layer, it needs to be able to query the
data online, and show the results in real-time.

It also needs a custom front-end, so a system like Tableau can't be used,
it must have a custom backend + front-end.

Thanks for the recommendation of Flume. Do you think this will work:

- Spark Streaming to read data from Kafka
- Storing the data on HDFS using Flume
- Using Spark to query the data in the backend of the web UI?



On Thu, Sep 29, 2016 at 7:08 PM, Mich Talebzadeh 
wrote:

> You need a batch layer and a speed layer. Data from Kafka can be stored on
> HDFS using flume.
>
> -  Query this data to generate reports / analytics (There will be a web UI
> which will be the front-end to the data, and will show the reports)
>
> This is basically batch layer and you need something like Tableau or
> Zeppelin to query data
>
> You will also need spark streaming to query data online for speed layer.
> That data could be stored in some transient fabric like ignite or even
> druid.
>
> HTH
>
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 29 September 2016 at 15:01, Ali Akhtar  wrote:
>
>> It needs to be able to scale to a very large amount of data, yes.
>>
>> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
>> wrote:
>>
>>> What is the message inflow ?
>>> If it's really high , definitely spark will be of great use .
>>>
>>> Thanks
>>> Deepak
>>>
>>> On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:
>>>
 I have a somewhat tricky use case, and I'm looking for ideas.

 I have 5-6 Kafka producers, reading various APIs, and writing their raw
 data into Kafka.

 I need to:

 - Do ETL on the data, and standardize it.

 - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
 ElasticSearch / Postgres)

 - Query this data to generate reports / analytics (There will be a web
 UI which will be the front-end to the data, and will show the reports)

 Java is being used as the backend language for everything (backend of
 the web UI, as well as the ETL layer)

 I'm considering:

 - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
 (receive raw data from Kafka, standardize & store it)

 - Using Cassandra, HBase, or raw HDFS, for storing the standardized
 data, and to allow queries

 - In the backend of the web UI, I could either use Spark to run queries
 across the data (mostly filters), or directly run queries against Cassandra
 / HBase

 I'd appreciate some thoughts / suggestions on which of these
 alternatives I should go with (e.g, using raw Kafka consumers vs Spark for
 ETL, which persistent data store to use, and how to query that data store
 in the backend of the web UI, for displaying the reports).


 Thanks.

>>>
>>
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Mich Talebzadeh
You need a batch layer and a speed layer. Data from Kafka can be stored on
HDFS using flume.

-  Query this data to generate reports / analytics (There will be a web UI
which will be the front-end to the data, and will show the reports)

This is basically batch layer and you need something like Tableau or
Zeppelin to query data

You will also need spark streaming to query data online for speed layer.
That data could be stored in some transient fabric like ignite or even
druid.

HTH








Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 29 September 2016 at 15:01, Ali Akhtar  wrote:

> It needs to be able to scale to a very large amount of data, yes.
>
> On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
> wrote:
>
>> What is the message inflow ?
>> If it's really high , definitely spark will be of great use .
>>
>> Thanks
>> Deepak
>>
>> On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:
>>
>>> I have a somewhat tricky use case, and I'm looking for ideas.
>>>
>>> I have 5-6 Kafka producers, reading various APIs, and writing their raw
>>> data into Kafka.
>>>
>>> I need to:
>>>
>>> - Do ETL on the data, and standardize it.
>>>
>>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
>>> ElasticSearch / Postgres)
>>>
>>> - Query this data to generate reports / analytics (There will be a web
>>> UI which will be the front-end to the data, and will show the reports)
>>>
>>> Java is being used as the backend language for everything (backend of
>>> the web UI, as well as the ETL layer)
>>>
>>> I'm considering:
>>>
>>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
>>> (receive raw data from Kafka, standardize & store it)
>>>
>>> - Using Cassandra, HBase, or raw HDFS, for storing the standardized
>>> data, and to allow queries
>>>
>>> - In the backend of the web UI, I could either use Spark to run queries
>>> across the data (mostly filters), or directly run queries against Cassandra
>>> / HBase
>>>
>>> I'd appreciate some thoughts / suggestions on which of these
>>> alternatives I should go with (e.g, using raw Kafka consumers vs Spark for
>>> ETL, which persistent data store to use, and how to query that data store
>>> in the backend of the web UI, for displaying the reports).
>>>
>>>
>>> Thanks.
>>>
>>
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
It needs to be able to scale to a very large amount of data, yes.

On Thu, Sep 29, 2016 at 7:00 PM, Deepak Sharma 
wrote:

> What is the message inflow ?
> If it's really high , definitely spark will be of great use .
>
> Thanks
> Deepak
>
> On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:
>
>> I have a somewhat tricky use case, and I'm looking for ideas.
>>
>> I have 5-6 Kafka producers, reading various APIs, and writing their raw
>> data into Kafka.
>>
>> I need to:
>>
>> - Do ETL on the data, and standardize it.
>>
>> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
>> ElasticSearch / Postgres)
>>
>> - Query this data to generate reports / analytics (There will be a web UI
>> which will be the front-end to the data, and will show the reports)
>>
>> Java is being used as the backend language for everything (backend of the
>> web UI, as well as the ETL layer)
>>
>> I'm considering:
>>
>> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer
>> (receive raw data from Kafka, standardize & store it)
>>
>> - Using Cassandra, HBase, or raw HDFS, for storing the standardized data,
>> and to allow queries
>>
>> - In the backend of the web UI, I could either use Spark to run queries
>> across the data (mostly filters), or directly run queries against Cassandra
>> / HBase
>>
>> I'd appreciate some thoughts / suggestions on which of these alternatives
>> I should go with (e.g, using raw Kafka consumers vs Spark for ETL, which
>> persistent data store to use, and how to query that data store in the
>> backend of the web UI, for displaying the reports).
>>
>>
>> Thanks.
>>
>


Re: Architecture recommendations for a tricky use case

2016-09-29 Thread Deepak Sharma
What is the message inflow ?
If it's really high , definitely spark will be of great use .

Thanks
Deepak

On Sep 29, 2016 19:24, "Ali Akhtar"  wrote:

> I have a somewhat tricky use case, and I'm looking for ideas.
>
> I have 5-6 Kafka producers, reading various APIs, and writing their raw
> data into Kafka.
>
> I need to:
>
> - Do ETL on the data, and standardize it.
>
> - Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
> ElasticSearch / Postgres)
>
> - Query this data to generate reports / analytics (There will be a web UI
> which will be the front-end to the data, and will show the reports)
>
> Java is being used as the backend language for everything (backend of the
> web UI, as well as the ETL layer)
>
> I'm considering:
>
> - Using raw Kafka consumers, or Spark Streaming, as the ETL layer (receive
> raw data from Kafka, standardize & store it)
>
> - Using Cassandra, HBase, or raw HDFS, for storing the standardized data,
> and to allow queries
>
> - In the backend of the web UI, I could either use Spark to run queries
> across the data (mostly filters), or directly run queries against Cassandra
> / HBase
>
> I'd appreciate some thoughts / suggestions on which of these alternatives
> I should go with (e.g, using raw Kafka consumers vs Spark for ETL, which
> persistent data store to use, and how to query that data store in the
> backend of the web UI, for displaying the reports).
>
>
> Thanks.
>


configure spark with openblas, thanks

2016-09-29 Thread TheGeorge1918 .
Hi all,


I’m trying to properly configure OpenBlas in spark ml. I use centos7,
hadoop2.7.2, spark2.0 and python2.7. (I use pyspark to build ml pipeline)


At first I have following warnings


*WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS*

*WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeRefBLAS*


In order to solve the second WARN, I recompile spark from source using


*./build/mvn -Pyarn -Pnetlib-lgpl -Phadoop-$HADOOP_MAJOR_VERSION
-Dhadoop.version=$HADOOP_VERSION -DskipTests clean package*


Now, the second WARN disappeared. To configure openblas, I first installed
openblas and


*ln -s /opt/OpenBLAS/lib/libopenblas.so libblas.so*

*ln -s /opt/OpenBLAS/lib/libopenblas.so liblapack.so*


Still, the warning is there.


I also tried to add


*ln -s /opt/OpenBLAS/lib/libopenblas.so libblas.so.3*

*ln -s /opt/OpenBLAS/lib/libopenblas.so liblapack.so.3*


No luck. It still doesn't work.


Last, I tried to configure numpy to use openblas. Here is the info


*lapack_opt_info:*

*libraries = ['openblas', 'openblas']*

*library_dirs = ['/opt/OpenBLAS/lib']*

*define_macros = [('HAVE_CBLAS', None)]*

*language = c*

*blas_opt_info:*

*libraries = ['openblas', 'openblas']*

*library_dirs = ['/opt/OpenBLAS/lib']*

*define_macros = [('HAVE_CBLAS', None)]*

*language = c*

*openblas_info:*

*libraries = ['openblas', 'openblas']*

*library_dirs = ['/opt/OpenBLAS/lib']*

*define_macros = [('HAVE_CBLAS', None)]*

*language = c*

*openblas_lapack_info:*

*libraries = ['openblas', 'openblas']*

*library_dirs = ['/opt/OpenBLAS/lib']*

*define_macros = [('HAVE_CBLAS', None)]*

*language = c*

*blas_mkl_info:*

*  NOT AVAILABLE*



I still get the WARN


*WARN BLAS: Failed to load implementation from:
com.github.fommil.netlib.NativeSystemBLAS*



Did I miss something? Thanks a lot.


Best

Xuan


Architecture recommendations for a tricky use case

2016-09-29 Thread Ali Akhtar
I have a somewhat tricky use case, and I'm looking for ideas.

I have 5-6 Kafka producers, reading various APIs, and writing their raw
data into Kafka.

I need to:

- Do ETL on the data, and standardize it.

- Store the standardized data somewhere (HBase / Cassandra / Raw HDFS /
ElasticSearch / Postgres)

- Query this data to generate reports / analytics (There will be a web UI
which will be the front-end to the data, and will show the reports)

Java is being used as the backend language for everything (backend of the
web UI, as well as the ETL layer)

I'm considering:

- Using raw Kafka consumers, or Spark Streaming, as the ETL layer (receive
raw data from Kafka, standardize & store it)

- Using Cassandra, HBase, or raw HDFS, for storing the standardized data,
and to allow queries

- In the backend of the web UI, I could either use Spark to run queries
across the data (mostly filters), or directly run queries against Cassandra
/ HBase

I'd appreciate some thoughts / suggestions on which of these alternatives I
should go with (e.g, using raw Kafka consumers vs Spark for ETL, which
persistent data store to use, and how to query that data store in the
backend of the web UI, for displaying the reports).


Thanks.


RE: building runnable distribution from source

2016-09-29 Thread Mendelson, Assaf
Thanks, that solved it.
If there is a developer here, it would be useful if this error would be marked 
as error instead of INFO (especially since this causes core to fail instead of 
an R package).
Thanks,
Assaf.

-Original Message-
From: Ding Fei [mailto:ding...@stars.org.cn] 
Sent: Thursday, September 29, 2016 1:20 PM
To: Mendelson, Assaf
Cc: user@spark.apache.org
Subject: Re: building runnable distribution from source

Check that your R is properly installed:

>Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is
properly 
>installed.



On Thu, 2016-09-29 at 01:08 -0700, AssafMendelson wrote:
> Hi,
> 
> I am trying to compile the latest branch of spark in order to try out 
> some code I wanted to contribute.
> 
> 
> I was looking at the instructions to build from 
> http://spark.apache.org/docs/latest/building-spark.html
> 
> So at first I did:
> 
> ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests 
> clean package
> 
> This worked without a problem and compiled.
> 
>  
> 
> I then did
> 
> ./dev/make-distribution.sh --name custom-spark --tgz -e -Psparkr
> -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn
> 
> Which failed.
> 
> (I added the –e because the first run, without it suggested adding 
> this to get more information).
> 
> If I look at the compilation itself, It provides no messages for spark 
> project core:
> 
>  
> 
> [INFO] Building Spark Project Core 2.1.0-SNAPSHOT
> 
> [INFO]
> --
> --
> 
> [INFO]
>  
> 
> [INFO]
> --
> --
> 
> [INFO] Building Spark Project YARN Shuffle Service 2.1.0-SNAPSHOT
> 
> [INFO]
> --
> -
> 
>  
> 
> However, when I reach the summary I find that core has failed to 
> compile.
> 
> Below is the messages from the end of the compilation but I can’t find 
> any direct error.
> 
> I tried to google this but found no solution. Could anyone point me to 
> how to fix this?
> 
>  
> 
>  
> 
> [INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @
> spark-core_2.11 ---
> 
> [INFO] Changes detected - recompiling the module!
> 
> [INFO] Compiling 74 source files
> to /home/mendea3/git/spark/core/target/scala-2.11/classes
> 
> [INFO]
> 
> [INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.11
> ---
> 
> Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is 
> properly installed.
> 
> [INFO]
> --
> --
> 
> [INFO] Reactor Summary:
> 
> [INFO]
> 
> [INFO] Spark Project Parent POM ... SUCCESS [
> 4.165 s]
> 
> [INFO] Spark Project Tags . SUCCESS [
> 5.163 s]
> 
> [INFO] Spark Project Sketch ... SUCCESS [
> 7.393 s]
> 
> [INFO] Spark Project Networking ... SUCCESS [ 
> 18.929 s]
> 
> [INFO] Spark Project Shuffle Streaming Service  SUCCESS [ 
> 10.528 s]
> 
> [INFO] Spark Project Unsafe ... SUCCESS [ 
> 14.453 s]
> 
> [INFO] Spark Project Launcher . SUCCESS [ 
> 15.198 s]
> 
> [INFO] Spark Project Core . FAILURE [ 
> 57.641 s]
> 
> [INFO] Spark Project ML Local Library . SUCCESS [ 
> 10.561 s]
> 
> [INFO] Spark Project GraphX ... SKIPPED
> 
> [INFO] Spark Project Streaming  SKIPPED
> 
> [INFO] Spark Project Catalyst . SKIPPED
> 
> [INFO] Spark Project SQL .. SKIPPED
> 
> [INFO] Spark Project ML Library ... SKIPPED
> 
> [INFO] Spark Project Tools  SUCCESS [
> 4.188 s]
> 
> [INFO] Spark Project Hive . SKIPPED
> 
> [INFO] Spark Project REPL . SKIPPED
> 
> [INFO] Spark Project YARN Shuffle Service . SUCCESS [ 
> 16.128 s]
> 
> [INFO] Spark Project YARN . SKIPPED
> 
> [INFO] Spark Project Hive Thrift Server ... SKIPPED
> 
> [INFO] Spark Project Assembly . SKIPPED
> 
> [INFO] Spark Project External Flume Sink .. SUCCESS [
> 9.855 s]
> 
> [INFO] Spark Project External Flume ... SKIPPED
> 
> [INFO] Spark Project External Flume Assembly .. SKIPPED
> 
> [INFO] Spark Integration for Kafka 0.8  SKIPPED
> 
> [INFO] Spark Project Examples . SKIPPED
> 
> [INFO] Spark Project External Kafka Assembly .. SKIPPED
> 
> [INFO] Spark Integration for Kafka 0.10 ... SKIPPED
> 
> [INFO] Spark Integration for Kafka 0.10 

mapWithState() without data checkpointing

2016-09-29 Thread Alexey Kharlamov
Hello!

I would like to avoid data checkpointing when processing a DStream. Basically, 
we do not care if the intermediate data are lost. 

Is there a way to achieve that? Is there an extension point or class embedding 
all associated activities?

Thanks!

Sincerely yours,
—
Alexey Kharlamov
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Treadting NaN fields in Spark

2016-09-29 Thread Michael Segel
Hi,

Just a few thoughts so take it for what its worth…

Databases have static schemas and will reject a row’s column on insert.

In your case… you have one data set where you have a column which is supposed 
to be a number but you have it as a string.
You want to convert this to a double in your final data set.


It looks like your problem is that your original data set that you ingested 
used a ‘-‘ (dash) to represent missing data, rather than a NULL value.
In fact, looking at the rows… you seem to have a stock that didn’t trade for a 
given day. (All have Volume as 0. ) Why do you need this?  Wouldn’t you want to 
represent this as null or no row for a given date?

The reason your ‘-‘ check failed when isnan() is that ‘-‘ actually could be 
represented as a number.

If you replaced the ‘-‘ with a String that is wider than the width of a double 
… the isnan should flag the row.

(I still need more coffee, so I could be wrong) ;-)

HTH

-Mike

On Sep 28, 2016, at 5:56 AM, Mich Talebzadeh 
> wrote:


This is an issue in most databases. Specifically if a field is NaN.. --> (NaN, 
standing for not a number, is a numeric data type value representing an 
undefined or unrepresentable value, especially in floating-point calculations)

There is a method called isnan() in Spark that is supposed to handle this 
scenario . However, it does not return correct values! For example I defined 
column "Open" as String  (it should be Float) and it has the following 7 rogue 
entries out of 1272 rows in a csv

df2.filter( $"OPen" === 
"-").select((changeToDate("TradeDate").as("TradeDate")), 'Open, 'High, 'Low, 
'Close, 'Volume).show

+--+++---+-+--+
| TradeDate|Open|High|Low|Close|Volume|
+--+++---+-+--+
|2011-12-23|   -|   -|  -|40.56| 0|
|2011-04-21|   -|   -|  -|45.85| 0|
|2010-12-30|   -|   -|  -|38.10| 0|
|2010-12-23|   -|   -|  -|38.36| 0|
|2008-04-30|   -|   -|  -|32.39| 0|
|2008-04-29|   -|   -|  -|33.05| 0|
|2008-04-28|   -|   -|  -|32.60| 0|
+--+++---+-+--+

However, the following does not work!

 df2.filter(isnan($"Open")).show
+-+--+-+++---+-+--+
|Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
+-+--+-+++---+-+--+
+-+--+-+++---+-+--+

Any suggestions?

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.






Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-09-29 Thread joffe.tal
You can use partition explicitly by adding "/=" to
the end of the path you are writing to and then use overwrite.

BTW in Spark 2.0 you just need to use:

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)
and use s3a://

and you can work with regular output committer (actually
DirectParquetOutputCommitter is no longer available in Spark 2.0)

so if you are planning on upgrading this could be another motivation



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-PartitionBy-SaveMode-Append-tp26398p27810.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Hive Rejection

2016-09-29 Thread Michael Segel
Correct me if I’m wrong but isn’t hive schema on read and not on write?
So you shouldn’t fail on write.


On Sep 29, 2016, at 1:25 AM, Mostafa Alaa Mohamed 
> wrote:

Dears,
I want to ask
• What will happened if there are rejections rows when inserting 
dataframe into hive?
o   Rejection will be for example table required integer into column and 
dataframe include string.
o   Duplication rejection restriction from the table itself?
• How can we specify the rejection directory?
If not avaiable do you recommend to open Jira issue?

Best Regards,
Mostafa Alaa Mohamed,
Technical Expert Big Data,
M: +971506450787
Email: mohamedamost...@etisalat.ae


The content of this email together with any attachments, statements and 
opinions expressed herein contains information that is private and confidential 
are intended for the named addressee(s) only. If you are not the addressee of 
this email you may not copy, forward, disclose or otherwise use it or any part 
of it in any form whatsoever. If you have received this message in error please 
notify postmas...@etisalat.ae by email 
immediately and delete the message without making any copies.



Re: building runnable distribution from source

2016-09-29 Thread Michael Segel
You may want to replace the 2.4 with a later release.

On Sep 29, 2016, at 3:08 AM, AssafMendelson 
> wrote:

Hi,
I am trying to compile the latest branch of spark in order to try out some code 
I wanted to contribute.

I was looking at the instructions to build from 
http://spark.apache.org/docs/latest/building-spark.html
So at first I did:
./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
This worked without a problem and compiled.

I then did
./dev/make-distribution.sh --name custom-spark --tgz -e -Psparkr -Phadoop-2.4 
-Phive -Phive-thriftserver -Pyarn
Which failed.
(I added the –e because the first run, without it suggested adding this to get 
more information).
If I look at the compilation itself, It provides no messages for spark project 
core:

[INFO] Building Spark Project Core 2.1.0-SNAPSHOT
[INFO] 
[INFO]
[INFO] 
[INFO] Building Spark Project YARN Shuffle Service 2.1.0-SNAPSHOT
[INFO] ---

However, when I reach the summary I find that core has failed to compile.
Below is the messages from the end of the compilation but I can’t find any 
direct error.
I tried to google this but found no solution. Could anyone point me to how to 
fix this?


[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ 
spark-core_2.11 ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 74 source files to 
/home/mendea3/git/spark/core/target/scala-2.11/classes
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.11 ---
Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is properly 
installed.
[INFO] 
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [  4.165 s]
[INFO] Spark Project Tags . SUCCESS [  5.163 s]
[INFO] Spark Project Sketch ... SUCCESS [  7.393 s]
[INFO] Spark Project Networking ... SUCCESS [ 18.929 s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [ 10.528 s]
[INFO] Spark Project Unsafe ... SUCCESS [ 14.453 s]
[INFO] Spark Project Launcher . SUCCESS [ 15.198 s]
[INFO] Spark Project Core . FAILURE [ 57.641 s]
[INFO] Spark Project ML Local Library . SUCCESS [ 10.561 s]
[INFO] Spark Project GraphX ... SKIPPED
[INFO] Spark Project Streaming  SKIPPED
[INFO] Spark Project Catalyst . SKIPPED
[INFO] Spark Project SQL .. SKIPPED
[INFO] Spark Project ML Library ... SKIPPED
[INFO] Spark Project Tools  SUCCESS [  4.188 s]
[INFO] Spark Project Hive . SKIPPED
[INFO] Spark Project REPL . SKIPPED
[INFO] Spark Project YARN Shuffle Service . SUCCESS [ 16.128 s]
[INFO] Spark Project YARN . SKIPPED
[INFO] Spark Project Hive Thrift Server ... SKIPPED
[INFO] Spark Project Assembly . SKIPPED
[INFO] Spark Project External Flume Sink .. SUCCESS [  9.855 s]
[INFO] Spark Project External Flume ... SKIPPED
[INFO] Spark Project External Flume Assembly .. SKIPPED
[INFO] Spark Integration for Kafka 0.8  SKIPPED
[INFO] Spark Project Examples . SKIPPED
[INFO] Spark Project External Kafka Assembly .. SKIPPED
[INFO] Spark Integration for Kafka 0.10 ... SKIPPED
[INFO] Spark Integration for Kafka 0.10 Assembly .. SKIPPED
[INFO] Spark Project Java 8 Tests . SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 01:52 min (Wall Clock)
[INFO] Finished at: 2016-09-29T10:48:57+03:00
[INFO] Final Memory: 49M/771M
[INFO] 
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:exec 
(sparkr-pkg) on project spark-core_2.11: Command execution failed. Process 
exited with an error: 1 (Exit value: 1) -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal 
org.codehaus.mojo:exec-maven-plugin:1.4.0:exec (sparkr-pkg) on project 
spark-core_2.11: Command execution failed.
at 

spark listener do not get fail status

2016-09-29 Thread Aseem Bansal
Hi

Submitting job via spark api but I never get fail status even when the job
throws an exception or exit via System.exit(-1)

How do I indicate via SparkListener API that my job failed?


Re: Large-scale matrix inverse in Spark

2016-09-29 Thread Robineast
The paper you mention references a Spark-based LU decomposition approach. AFAIK 
there is no current implementation in Spark but there is a JIRA open 
(https://issues.apache.org/jira/browse/SPARK-8514 
) that covers this - seems to 
have gone quiet though.
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action 






> On 27 Sep 2016, at 03:05, Cooper [via Apache Spark User List] 
>  wrote:
> 
> How is the problem of large-scale matrix inversion approached in Apache Spark 
> ? 
> 
> This linear algebra operation is obviously the very base of a lot of other 
> algorithms (regression, classification, etc). However, I have not been able 
> to find a Spark API on parallel implementation of matrix inversion. Can you 
> please clarify approaching this operation on the Spark internals ? 
> 
> Here  is a paper on 
> the parallelized matrix inversion in Spark, however I am trying to use an 
> existing code instead of implementing one from scratch, if available. 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-matrix-inverse-in-Spark-tp27796.html
>  
> 
> To start a new topic under Apache Spark User List, email 
> ml-node+s1001560n1...@n3.nabble.com 
> To unsubscribe from Apache Spark User List, click here 
> .
> NAML 
> 




-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Large-scale-matrix-inverse-in-Spark-tp27796p27809.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: building runnable distribution from source

2016-09-29 Thread Ding Fei
Check that your R is properly installed:

>Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is
properly 
>installed.



On Thu, 2016-09-29 at 01:08 -0700, AssafMendelson wrote:
> Hi,
> 
> I am trying to compile the latest branch of spark in order to try out
> some code I wanted to contribute.
> 
> 
> I was looking at the instructions to build from
> http://spark.apache.org/docs/latest/building-spark.html
> 
> So at first I did:
> 
> ./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests
> clean package
> 
> This worked without a problem and compiled.
> 
>  
> 
> I then did 
> 
> ./dev/make-distribution.sh --name custom-spark --tgz -e -Psparkr
> -Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn 
> 
> Which failed.
> 
> (I added the –e because the first run, without it suggested adding
> this to get more information).
> 
> If I look at the compilation itself, It provides no messages for spark
> project core:
> 
>  
> 
> [INFO] Building Spark Project Core 2.1.0-SNAPSHOT
> 
> [INFO]
> 
> 
> [INFO]
>  
> 
> [INFO]
> 
> 
> [INFO] Building Spark Project YARN Shuffle Service 2.1.0-SNAPSHOT
> 
> [INFO]
> ---
> 
>  
> 
> However, when I reach the summary I find that core has failed to
> compile.
> 
> Below is the messages from the end of the compilation but I can’t find
> any direct error. 
> 
> I tried to google this but found no solution. Could anyone point me to
> how to fix this?
> 
>  
> 
>  
> 
> [INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @
> spark-core_2.11 ---
> 
> [INFO] Changes detected - recompiling the module!
> 
> [INFO] Compiling 74 source files
> to /home/mendea3/git/spark/core/target/scala-2.11/classes
> 
> [INFO] 
> 
> [INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.11
> ---
> 
> Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is
> properly installed.
> 
> [INFO]
> 
> 
> [INFO] Reactor Summary:
> 
> [INFO] 
> 
> [INFO] Spark Project Parent POM ... SUCCESS [
> 4.165 s]
> 
> [INFO] Spark Project Tags . SUCCESS [
> 5.163 s]
> 
> [INFO] Spark Project Sketch ... SUCCESS [
> 7.393 s]
> 
> [INFO] Spark Project Networking ... SUCCESS
> [ 18.929 s]
> 
> [INFO] Spark Project Shuffle Streaming Service  SUCCESS
> [ 10.528 s]
> 
> [INFO] Spark Project Unsafe ... SUCCESS
> [ 14.453 s]
> 
> [INFO] Spark Project Launcher . SUCCESS
> [ 15.198 s]
> 
> [INFO] Spark Project Core . FAILURE
> [ 57.641 s]
> 
> [INFO] Spark Project ML Local Library . SUCCESS
> [ 10.561 s]
> 
> [INFO] Spark Project GraphX ... SKIPPED
> 
> [INFO] Spark Project Streaming  SKIPPED
> 
> [INFO] Spark Project Catalyst . SKIPPED
> 
> [INFO] Spark Project SQL .. SKIPPED
> 
> [INFO] Spark Project ML Library ... SKIPPED
> 
> [INFO] Spark Project Tools  SUCCESS [
> 4.188 s]
> 
> [INFO] Spark Project Hive . SKIPPED
> 
> [INFO] Spark Project REPL . SKIPPED
> 
> [INFO] Spark Project YARN Shuffle Service . SUCCESS
> [ 16.128 s]
> 
> [INFO] Spark Project YARN . SKIPPED
> 
> [INFO] Spark Project Hive Thrift Server ... SKIPPED
> 
> [INFO] Spark Project Assembly . SKIPPED
> 
> [INFO] Spark Project External Flume Sink .. SUCCESS [
> 9.855 s]
> 
> [INFO] Spark Project External Flume ... SKIPPED
> 
> [INFO] Spark Project External Flume Assembly .. SKIPPED
> 
> [INFO] Spark Integration for Kafka 0.8  SKIPPED
> 
> [INFO] Spark Project Examples . SKIPPED
> 
> [INFO] Spark Project External Kafka Assembly .. SKIPPED
> 
> [INFO] Spark Integration for Kafka 0.10 ... SKIPPED
> 
> [INFO] Spark Integration for Kafka 0.10 Assembly .. SKIPPED
> 
> [INFO] Spark Project Java 8 Tests . SKIPPED
> 
> [INFO]
> 
> 
> [INFO] BUILD FAILURE
> 
> [INFO]
> 
> 
> [INFO] Total time: 01:52 min (Wall Clock)
> 
> [INFO] Finished at: 2016-09-29T10:48:57+03:00
> 
> [INFO] Final Memory: 49M/771M
> 
> [INFO]
> 

spark sql on json

2016-09-29 Thread Hitesh Goyal
Hi team,

I have a json document. I want to put spark SQL to it.
Can you please send me an example app built in JAVA so that I would be able to 
put spark sql queries on my data.

Regards,
Hitesh Goyal
Simpli5d Technologies
Cont No.: 9996588220



Re: Using Spark as a Maven dependency but with Hadoop 2.6

2016-09-29 Thread Sean Owen
No, I think that's what dependencyManagent (or equivalent) is definitely for.

On Thu, Sep 29, 2016 at 5:37 AM, Olivier Girardot
 wrote:
> I know that the code itself would not be the same, but it would be useful to
> at least have the pom/build.sbt transitive dependencies different when
> fetching the artifact with a specific classifier, don't you think ?
> For now I've overriden them myself using the dependency versions defined in
> the pom.xml of spark.
> So it's not a blocker issue, it may be useful to document it, but a blog
> post would be sufficient I think.
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark as a Maven dependency but with Hadoop 2.6

2016-09-29 Thread Olivier Girardot
I know that the code itself would not be the same, but it would be useful to at
least have the pom/build.sbt transitive dependencies different when fetching the
artifact with a specific classifier, don't you think ?For now I've overriden
them myself using the dependency versions defined in the pom.xml of spark.So
it's not a blocker issue, it may be useful to document it, but a blog post would
be sufficient I think.
 





On Wed, Sep 28, 2016 7:21 PM, Sean Owen so...@cloudera.com
wrote:
I guess I'm claiming the artifacts wouldn't even be different in the first
place, because the Hadoop APIs that are used are all the same across these
versions. That would be the thing that makes you need multiple versions of the
artifact under multiple classifiers.
On Wed, Sep 28, 2016 at 1:16 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
ok, don't you think it could be published with just different classifiers
hadoop-2.6hadoop-2.4
hadoop-2.2 being the current default.
So for now, I should just override spark 2.0.0's dependencies with the ones
defined in the pom profile

 





On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com
wrote:
There can be just one published version of the Spark artifacts and they have to
depend on something, though in truth they'd be binary-compatible with anything
2.2+. So you merely manage the dependency versions up to the desired version in
your .
On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up
with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to
generate the different tar.gz bundles that we can download, Is there by any
chance publications of Spark 2.0.0 with different classifier according to
different versions of Hadoop available ?
Thanks for your time !
Olivier Girardot

 


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94
 


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: spark persistence doubt

2016-09-29 Thread Bedrytski Aliaksandr
Hi,

the 4th step should contain "transformrdd2", right?

considering that transformations are lined-up and executed only when
there is an action (also known as lazy execution), I would say that
adding persist() to the step 1 would not do any good (and may even be
harmful as you may lose the optimisations given by lining up the 3 steps
in one operation).

If there is a second action executed on any of the transformation,
persisting the farthest common transformation would be a good idea.

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Thu, Sep 29, 2016, at 07:09, Shushant Arora wrote:
> Hi
>
> I have a flow like below
>
> 1.rdd1=some source.transform();
> 2.tranformedrdd1 = rdd1.transform(..);
> 3.transformrdd2 = rdd1.transform(..);
>
> 4.tranformrdd1.action();
>
> Does I need to persist rdd1 to optimise step 2 and 3 ? or since there
> is no lineage breakage so it will work without persist ?
>
> Thanks
>


building runnable distribution from source

2016-09-29 Thread AssafMendelson
Hi,
I am trying to compile the latest branch of spark in order to try out some code 
I wanted to contribute.

I was looking at the instructions to build from 
http://spark.apache.org/docs/latest/building-spark.html
So at first I did:
./build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package
This worked without a problem and compiled.

I then did
./dev/make-distribution.sh --name custom-spark --tgz -e -Psparkr -Phadoop-2.4 
-Phive -Phive-thriftserver -Pyarn
Which failed.
(I added the -e because the first run, without it suggested adding this to get 
more information).
If I look at the compilation itself, It provides no messages for spark project 
core:

[INFO] Building Spark Project Core 2.1.0-SNAPSHOT
[INFO] 
[INFO]
[INFO] 
[INFO] Building Spark Project YARN Shuffle Service 2.1.0-SNAPSHOT
[INFO] ---

However, when I reach the summary I find that core has failed to compile.
Below is the messages from the end of the compilation but I can't find any 
direct error.
I tried to google this but found no solution. Could anyone point me to how to 
fix this?


[INFO] --- maven-compiler-plugin:3.5.1:compile (default-compile) @ 
spark-core_2.11 ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 74 source files to 
/home/mendea3/git/spark/core/target/scala-2.11/classes
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:exec (sparkr-pkg) @ spark-core_2.11 ---
Cannot find 'R_HOME'. Please specify 'R_HOME' or make sure R is properly 
installed.
[INFO] 
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM ... SUCCESS [  4.165 s]
[INFO] Spark Project Tags . SUCCESS [  5.163 s]
[INFO] Spark Project Sketch ... SUCCESS [  7.393 s]
[INFO] Spark Project Networking ... SUCCESS [ 18.929 s]
[INFO] Spark Project Shuffle Streaming Service  SUCCESS [ 10.528 s]
[INFO] Spark Project Unsafe ... SUCCESS [ 14.453 s]
[INFO] Spark Project Launcher . SUCCESS [ 15.198 s]
[INFO] Spark Project Core . FAILURE [ 57.641 s]
[INFO] Spark Project ML Local Library . SUCCESS [ 10.561 s]
[INFO] Spark Project GraphX ... SKIPPED
[INFO] Spark Project Streaming  SKIPPED
[INFO] Spark Project Catalyst . SKIPPED
[INFO] Spark Project SQL .. SKIPPED
[INFO] Spark Project ML Library ... SKIPPED
[INFO] Spark Project Tools  SUCCESS [  4.188 s]
[INFO] Spark Project Hive . SKIPPED
[INFO] Spark Project REPL . SKIPPED
[INFO] Spark Project YARN Shuffle Service . SUCCESS [ 16.128 s]
[INFO] Spark Project YARN . SKIPPED
[INFO] Spark Project Hive Thrift Server ... SKIPPED
[INFO] Spark Project Assembly . SKIPPED
[INFO] Spark Project External Flume Sink .. SUCCESS [  9.855 s]
[INFO] Spark Project External Flume ... SKIPPED
[INFO] Spark Project External Flume Assembly .. SKIPPED
[INFO] Spark Integration for Kafka 0.8  SKIPPED
[INFO] Spark Project Examples . SKIPPED
[INFO] Spark Project External Kafka Assembly .. SKIPPED
[INFO] Spark Integration for Kafka 0.10 ... SKIPPED
[INFO] Spark Integration for Kafka 0.10 Assembly .. SKIPPED
[INFO] Spark Project Java 8 Tests . SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 01:52 min (Wall Clock)
[INFO] Finished at: 2016-09-29T10:48:57+03:00
[INFO] Final Memory: 49M/771M
[INFO] 
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:exec 
(sparkr-pkg) on project spark-core_2.11: Command execution failed. Process 
exited with an error: 1 (Exit value: 1) -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal 
org.codehaus.mojo:exec-maven-plugin:1.4.0:exec (sparkr-pkg) on project 
spark-core_2.11: Command execution failed.
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212)
at 
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at 

Need help :- org.apache.spark.SparkException :- No such file or directory

2016-09-29 Thread Madabhattula Rajesh Kumar
Hi Team,

I getting below exception in spark jobs. Please let me know how to fix this
issue.

*Below is my cluster configuration :- *

I am using SparkJobServer to trigger the jobs. Below is my configuration in
SparkJobServer.

   - num-cpu-cores = 4
   - memory-per-node = 4G

I have a 4 workers in my cluster.


"result": {
"errorClass": "org.apache.spark.SparkException",
"cause":
"/tmp/spark-31a538f3-9451-4a2d-9123-00feb56c9e91/executor-73be6ffd-cd03-452a-bc99-a44290953d4f/spark-d0630f1f-e3df-4714-af30-4c839f6e3e8a/9400069401471754061530_lock
(No such file or directory)",
"stack": ["java.io.RandomAccessFile.open0(Native Method)",
"java.io.RandomAccessFile.open(RandomAccessFile.java:316)",
"java.io.RandomAccessFile.(RandomAccessFile.java:243)",
"org.apache.spark.util.Utils$.fetchFile(Utils.scala:373)",
"org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:405)",
"org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:397)",
"scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)",
"scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)",
"scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)",
"scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)",
"scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)",
"scala.collection.mutable.HashMap.foreach(HashMap.scala:98)",
"scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)",
"org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:397)",
"org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)",
"java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)",
"java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)",
"java.lang.Thread.run(Thread.java:745)"],
"causingClass": "java.io.FileNotFoundException",
"message": "Job aborted due to stage failure: Task 0 in stage 1286.0
failed 4 times, most recent failure: Lost task 0.3 in stage 1286.0 (TID
39149, svcjo-prd911.cisco.com): java.io.FileNotFoundException:
/tmp/spark-31a538f3-9451-4a2d-9123-00feb56c9e91/executor-73be6ffd-cd03-452a-bc99-a44290953d4f/spark-d0630f1f-e3df-4714-af30-4c839f6e3e8a/9400069401471754061530_lock
(No such file or directory)\n\tat java.io.RandomAccessFile.open0(Native
Method)\n\tat
java.io.RandomAccessFile.open(RandomAccessFile.java:316)\n\tat
java.io.RandomAccessFile.(RandomAccessFile.java:243)\n\tat
org.apache.spark.util.Utils$.fetchFile(Utils.scala:373)\n\tat
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:405)\n\tat
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:397)\n\tat
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)\n\tat
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)\n\tat
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)\n\tat
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)\n\tat
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)\n\tat
scala.collection.mutable.HashMap.foreach(HashMap.scala:98)\n\tat
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)\n\tat
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:397)\n\tat
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat
java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:"
  },

Regards,
Rajesh


Spark Hive Rejection

2016-09-29 Thread Mostafa Alaa Mohamed
Dears,
I want to ask

* What will happened if there are rejections rows when inserting 
dataframe into hive?

o   Rejection will be for example table required integer into column and 
dataframe include string.

o   Duplication rejection restriction from the table itself?

* How can we specify the rejection directory?
If not avaiable do you recommend to open Jira issue?

Best Regards,
Mostafa Alaa Mohamed,
Technical Expert Big Data,
M: +971506450787
Email: mohamedamost...@etisalat.ae


The content of this email together with any attachments, statements and 
opinions expressed herein contains information that is private and confidential 
are intended for the named addressee(s) only. If you are not the addressee of 
this email you may not copy, forward, disclose or otherwise use it or any part 
of it in any form whatsoever. If you have received this message in error please 
notify postmas...@etisalat.ae by email immediately and delete the message 
without making any copies.