Re: Why do checkpoints work the way they do?

2017-08-29 Thread Tathagata Das
Hello, This is an unfortunate design on my part when I was building DStreams :) Fortunately, we learnt from our mistakes and built Structured Streaming the correct way. Checkpointing in Structured Streaming stores only the progress information (offsets, etc.), and the user can change their

Re: Time window on Processing Time

2017-08-29 Thread Tathagata Das
Yes, it can be! There is a sql function called current_timestamp() which is self-explanatory. So I believe you should be able to do something like import org.apache.spark.sql.functions._ ds.withColumn("processingTime", current_timestamp()) .groupBy(window("processingTime", "1 minute"))

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Aah, I might have misinterpreted. The groupBy + window solution would give the max time for each train over 24 hours (non-overlapping window) of event data (timestamped by activity_timestamp). So the output would be like. Train Dest Window(activity_timestamp)max(Time) 1 HK

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Can I do sub queries using DataSet or DataFrame API's but not raw sql unless it is a final resort? On Tue, Aug 29, 2017 at 8:47 PM, ayan guha wrote: > This is not correct. In SQL Land, your query should be like below: > > select * from ( > select Train,DEST,TIME,

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
This is not correct. In SQL Land, your query should be like below: select * from ( select Train,DEST,TIME, row_number() over (partition by train order by time desc) r from TrainTable ) x where r=1 All the constructs supported in dataframe functions. On Wed, Aug 30, 2017 at 1:08 PM, kant kodali

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
yes in a relational db one could just do this SELECT t.Train, t.Dest, r.MaxTimeFROM ( SELECT Train, MAX(Time) as MaxTime FROM TrainTable GROUP BY Train) rINNER JOIN TrainTable tON t.Train = r.Train AND t.Time = r.MaxTime (copied answer from a Stack overflow question someone

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
That just gives you the max time for each train. If I understood the question correctly, OP wants the whole row with the max time. That's generally solved through joins or subqueries, which would be hard to do in a streaming setting On Aug 29, 2017 7:29 PM, "ayan guha"

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
@ayan, Thanks for your response I would like to have functions in this case calculateIncome and the reason why I need function is to reuse in other parts of the application ..that's the reason I'm planning for mapgroups with function as argument which takes rowiterator ..but not sure if this is

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread ayan guha
Why removing the destination from the window wont work? Like this: *trainTimesDataset* * .withWatermark("**activity_timestamp", "5 days")* * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train")* * .max("time")* On Wed, Aug 30, 2017 at 10:38 AM, kant kodali

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread ayan guha
Hi the tool you are looking for is window function. Example: >>> df.show() +++---+--+-+ |JoinDate|dept| id|income|income_age_ts| +++---+--+-+ | 4/20/13| ES|101| 19000| 4/20/17| | 4/20/13| OS|101| 1| 10/3/15| | 4/20/12|

[Upvote] for Apache Spark for 2017 Innovation Award

2017-08-29 Thread Jules Damji
Fellow Spark users, If you think, and believe, deep in your hearts that Apache Spark deserves an innovation award, cast your vote here: https://jaxlondon.com/jax-awards Cheers, Jules Sent from my iPhone Pardon the dumb thumb typos :)

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
Please click on unnamed text/html link for better view On Tue, Aug 29, 2017 at 8:11 PM purna pradeep wrote: > > -- Forwarded message - > From: Mamillapalli, Purna Pradeep < > purnapradeep.mamillapa...@capitalone.com> > Date: Tue, Aug 29, 2017 at 8:08 PM

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
@Burak so how would the transformation or query would look like for the above example? I don't see flatMapGroupsWithState in the DataSet API Spark 2.1.1. I may be able to upgrade to 2.2.0 if that makes life easier. On Tue, Aug 29, 2017 at 5:25 PM, Burak Yavuz wrote: > Hey

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Burak Yavuz
Hey TD, If I understood the question correctly, your solution wouldn't return the exact solution, since it also groups by on destination. I would say the easiest solution would be to use flatMapGroupsWithState, where you: .groupByKey(_.train) and keep in state the row with the maximum time. On

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Yes. And in that case, if you just care about only the last few days of max, then you should set watermark on the timestamp column. *trainTimesDataset* * .withWatermark("**activity_timestamp", "5 days")* * .groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest")* *

Re: use WithColumn with external function in a java jar

2017-08-29 Thread purna pradeep
Thanks, I'll check it out. On Mon, Aug 28, 2017 at 10:22 PM Praneeth Gayam wrote: > You can create a UDF which will invoke your java lib > > def calculateExpense: UserDefinedFunction = udf((pexpense: String, cexpense: > String) => new

Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
-- Forwarded message - From: Mamillapalli, Purna Pradeep Date: Tue, Aug 29, 2017 at 8:08 PM Subject: Spark question To: purna pradeep Below is the input Dataframe(In real this is a very large Dataframe)

Spark2 create hive external table

2017-08-29 Thread antoniosi
Hi, I am trying to connect to spark thrift server to create an external table. In my table DDL, I have a tbl property 'spark.sql.sources.provider' = 'parquet', but I am getting an error "Cannot persistent into hive metastore as table property keys may not start with 'spark.sql.':

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi, Thanks for the response. Since this is a streaming based query and in my case I need to hold state for 24 hours which I forgot to mention in my previous email. can I do ? *trainTimesDataset.groupBy(window(activity_timestamp, "24 hours", "24 hours"), "train", "dest").max("time")* On Tue,

Re: Spark 2.0.0 and Hive metastore

2017-08-29 Thread Andrés Ivaldi
Every comment are welcome If I´m not wrong it's because we are using percentile aggregation which comes with Hive support, apart from that nothing else. On Tue, Aug 29, 2017 at 11:23 AM, Jean Georges Perrin wrote: > Sorry if my comment is not helping, but... why do you need

Re: How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread Tathagata Das
Say, *trainTimesDataset* is the streaming Dataset of schema *[train: Int, dest: String, time: Timestamp] * *Scala*: *trainTimesDataset.groupBy("train", "dest").max("time")* *SQL*: *"select train, dest, max(time) from trainTimesView group by train, dest"*// after calling

How to select the entire row that has max timestamp for every key in Spark Structured Streaming 2.1.1?

2017-08-29 Thread kant kodali
Hi All, I am wondering what is the easiest and concise way to express the computation below in Spark Structured streaming given that it supports both imperative and declarative styles? I am just trying to select rows that has max timestamp for each train? Instead of doing some sort of nested

Re: add arraylist to dataframe

2017-08-29 Thread yohann jardin
Hello Asmath, Your list exist inside the driver, but you try to add element in it from the executors. They are in different processes, on different nodes, they do not communicate just like that. https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions There exist an action

add arraylist to dataframe

2017-08-29 Thread KhajaAsmath Mohammed
Hi, I am initiating arraylist before iterating throuugh the map method. I am always getting the list size value as zero after map operation. How do I add values to list inside the map method of dataframe ? any suggestions? val points = new

Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Mikhailau, Alex
Would I use something like this to get to those VM arguments? val runtimeMxBean = ManagementFactory.getRuntimeMXBean val args = runtimeMxBean.getInputArguments val conf = Conf(args) etc. From: Vadim Semenov Date: Tuesday, August 29, 2017 at 11:49 AM To: "Mikhailau,

Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Vadim Semenov
Each java process for each of the executors has some environment variables that you can used, for example: > CONTAINER_ID=container_1503994094228_0054_01_13 The executor id gets passed as an argument to the process: > /usr/lib/jvm/java-1.8.0/bin/java … --driver-url

Re: Spark submit OutOfMemory Error in local mode

2017-08-29 Thread muthu
Are you getting OutOfMemory on the driver or on the executor? Typical cause of OOM in Spark can be due to fewer number of tasks for a job. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-OutOfMemory-Error-in-local-mode-tp29081p29117.html Sent

Re: Kill Spark Application programmatically in Spark standalone cluster

2017-08-29 Thread muthu
I had similar question in the past and worked around by having my spark-submit application to register to my master application in-order to co-ordinate kill and/or progress of execution. This is a bit clergy I suppose in comparison to a REST like API available in the spark stand-alone cluster.

Spark standalone API...

2017-08-29 Thread muthu
Hello there, I use spark standalone cluster (the one that's available at port 8080 when the cluster is started). I am writing to see if there are any REST APIs that's available to find the number of running applications, number of free cores and/or executors? I do know of quite extensive APIs

setup and cleanup function in spark

2017-08-29 Thread Mohammad Kargar
To implement setup/cleanup function in Spark we follow the pattern below as discussed here . rdd.mapPartitions { partition => if (!partition.isEmpty) { // Some setup code here

Re: Kafka Consumer Pre Fetch Messages + Async commits

2017-08-29 Thread Julia Wistance
Thanks Cody for the reply. My thoughts were that the time is anyways required to write and commit the offsets to any of the external systems - which are all sync. So why not sync commit of Kafka itself to store the offsets. It helps add another dependency on the application side to check if say

Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-29 Thread Cody Koeninger
I don't see anything obvious. If the slowness is correlated with the errors you're seeing, I'd start looking at what's going on with kafka or your network. On Mon, Aug 28, 2017 at 7:06 PM, swetha kasireddy wrote: > Hi Cody, > > Following is the way that I am

Re: Spark 2.0.0 and Hive metastore

2017-08-29 Thread Jean Georges Perrin
Sorry if my comment is not helping, but... why do you need Hive? Can't you save your aggregation using parquet for example? jg > On Aug 29, 2017, at 08:34, Andrés Ivaldi wrote: > > Hello, I'm using Spark API and with Hive support, I dont have a Hive > instance, just

Spark 2.0.0 and Hive metastore

2017-08-29 Thread Andrés Ivaldi
Hello, I'm using Spark API and with Hive support, I dont have a Hive instance, just using Hive for some aggregation functions. The problem is that Hive crete the hive and metastore_db folder at the temp folder, I want to change that location Regards. -- Ing. Ivaldi Andres

Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-29 Thread Georg Heiler
What about a custom UADF? Patrick schrieb am Mo. 28. Aug. 2017 um 20:54: > ok . i see there is a describe() function which does the stat calculation > on dataset similar to StatCounter but however i dont want to restrict my > aggregations to standard mean, stddev etc and

Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Yuval Itzchakov
I mean the `StreamingExecution` generated a proper error message: 2017-08-26 07:05:00,641 ERROR StreamExecution:? - Query [id = 8597ae0b-2183-407f-8300-239a24eb68ab, runId = c1fe627d-bcf4-4462-bbd9-b178ffaca860] terminated with error org.apache.spark.SparkException: Job aborted due to stage

Re: Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Tathagata Das
When you say "the application remained alive", do you mean the StreamingQuery stayed alive, or the whole process stayed alive? The StreamingQuery should be terminated immediately. And the stream execution threads are all daemon threads, so it should not affect the termination of the application

Terminating Structured Streaming Applications on Source Failure

2017-08-29 Thread Yuval Itzchakov
I wasn't sure if this would be a proper bug or not. Today, the behavior of Structured Streaming is such that if a source fails with an exception, the `StreamExecution` class halts reading further from the source, but the application is remained alive. For applications where the sole purpose is to

unable to import graphframes

2017-08-29 Thread Imran Rajjad
Dear list, I am following the documentation of graphframe and have started the scala shell using following command D:\spark-2.1.0-bin-hadoop2.7\bin>spark-shell --master local[2] --packages graphframes:graphframes:0.5.0-spark2.1-s_2.10 Ivy Default Cache set to: C:\Users\user\.ivy2\cache The