Re: Oracle Table not resolved [Spark 2.1.1]

2017-08-28 Thread Imran Rajjad
the jdbc url is invalid, but strangely it should have thrown ORA- exception

On Mon, Aug 28, 2017 at 4:55 PM, Naga G  wrote:

> Not able to find the database name.
> ora is the database in the below url ?
>
> Sent from Naga iPad
>
> > On Aug 28, 2017, at 4:06 AM, Imran Rajjad  wrote:
> >
> > Hello,
> >
> > I am trying to retrieve an oracle table into Dataset using
> following code
> >
> > String url = "jdbc:oracle@localhost:1521:ora";
> >   Dataset jdbcDF = spark.read()
> >   .format("jdbc")
> >   .option("driver", "oracle.jdbc.driver.OracleDriver")
> >   .option("url", url)
> >   .option("dbtable", "INCIDENTS")
> >   .option("user", "user1")
> >   .option("password", "pass1")
> >   .load();
> >
> >   System.out.println(jdbcDF.count());
> >
> > below is the stack trace
> >
> > java.lang.NullPointerException
> >  at org.apache.spark.sql.execution.datasources.jdbc.
> JDBCRDD$.resolveTable(JDBCRDD.scala:72)
> >  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(
> JDBCRelation.scala:113)
> >  at org.apache.spark.sql.execution.datasources.jdbc.
> JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
> >  at org.apache.spark.sql.execution.datasources.
> DataSource.resolveRelation(DataSource.scala:330)
> >  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
> >  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
> >  at com.elogic.hazelcast_test.JDBCTest.test(JDBCTest.java:56)
> >  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >  at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >  at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >  at java.lang.reflect.Method.invoke(Method.java:498)
> >  at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:50)
> >  at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:12)
> >  at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:47)
> >  at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:17)
> >  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> >  at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:78)
> >  at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:57)
> >  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> >  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> >  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> >  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> >  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> >  at org.junit.internal.runners.statements.RunBefores.
> evaluate(RunBefores.java:26)
> >  at org.junit.internal.runners.statements.RunAfters.evaluate(
> RunAfters.java:27)
> >  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> >  at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(
> JUnit4TestReference.java:86)
> >  at org.eclipse.jdt.internal.junit.runner.TestExecution.
> run(TestExecution.java:38)
> >  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> runTests(RemoteTestRunner.java:459)
> >  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> runTests(RemoteTestRunner.java:678)
> >  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> run(RemoteTestRunner.java:382)
> >  at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> main(RemoteTestRunner.java:192)
> >
> > Apparently the connection is made but Table is not being detected. Any
> ideas whats wrong with the code?
> >
> > regards,
> > Imran
> > --
> > I.R
>



-- 
I.R


Re: use WithColumn with external function in a java jar

2017-08-28 Thread Praneeth Gayam
You can create a UDF which will invoke your java lib

def calculateExpense: UserDefinedFunction = udf((pexpense: String,
cexpense: String) => new MyJava().calculateExpense(pexpense.toDouble,
cexpense.toDouble))





On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep 
wrote:

> I have data in a DataFrame with below columns
>
> 1)Fileformat is csv
> 2)All below column datatypes are String
>
> employeeid,pexpense,cexpense
>
> Now I need to create a new DataFrame which has new column called
> `expense`, which is calculated based on columns `pexpense`, `cexpense`.
>
> The tricky part is the calculation algorithm is not an **UDF** function
> which I created, but it's an external function that needs to be imported
> from a Java library which takes primitive types as arguments - in this case
> `pexpense`, `cexpense` - to calculate the value required for new column.
>
> The external function signature
>
> public class MyJava
>
> {
>
> public Double calculateExpense(Double pexpense, Double cexpense) {
>// calculation
> }
>
> }
>
> So how can I invoke that external function to create a new calculated
> column. Can I register that external function as UDF in my Spark
> application?
>
> Stackoverflow reference
>
> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-
> function
>
>
>
>
>
>


use WithColumn with external function in a java jar

2017-08-28 Thread purna pradeep
I have data in a DataFrame with below columns

1)Fileformat is csv
2)All below column datatypes are String

employeeid,pexpense,cexpense

Now I need to create a new DataFrame which has new column called `expense`,
which is calculated based on columns `pexpense`, `cexpense`.

The tricky part is the calculation algorithm is not an **UDF** function
which I created, but it's an external function that needs to be imported
from a Java library which takes primitive types as arguments - in this case
`pexpense`, `cexpense` - to calculate the value required for new column.

The external function signature

public class MyJava

{

public Double calculateExpense(Double pexpense, Double cexpense) {
   // calculation
}

}

So how can I invoke that external function to create a new calculated
column. Can I register that external function as UDF in my Spark
application?

Stackoverflow reference

https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
Hi Cody,

Following is the way that I am consuming data for a 60 second batch. Do you
see anything that is wrong with the way the data is getting consumed that
can cause slowness in performance?


val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "latest",
  "heartbeat.interval.ms" -> Integer.valueOf(2),
  "session.timeout.ms" -> Integer.valueOf(6),
  "request.timeout.ms" -> Integer.valueOf(9),
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "spark.streaming.kafka.consumer.cache.enabled" -> "false",
  "group.id" -> "test1"
)

  val hubbleStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
  )

val kafkaStreamRdd = kafkaStream.transform { rdd =>
rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
}

On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> There is no difference in performance even with Cache being enabled.
>
> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> There is no difference in performance even with Cache being disabled.
>>
>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger 
>> wrote:
>>
>>> So if you can run with cache enabled for some time, does that
>>> significantly affect the performance issue you were seeing?
>>>
>>> Those settings seem reasonable enough.   If preferred locations is
>>> behaving correctly you shouldn't need cached consumers for all 96
>>> partitions on any one executor, so that maxCapacity setting is
>>> probably unnecessary.
>>>
>>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>>  wrote:
>>> > Because I saw some posts that say that consumer cache  enabled will
>>> have
>>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>>> > errors as well after running for sometime with cache being enabled.
>>> So, I
>>> > had to disable it. Please see the tickets below.  We have 96
>>> partitions. So
>>> > if I enable cache, would teh following settings help to improve
>>> performance?
>>> >
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> >
>>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>>> >
>>> >
>>> > http://markmail.org/message/n4cdxwurlhf44q5x
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-19185
>>> >
>>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger 
>>> wrote:
>>> >>
>>> >> Why are you setting consumer.cache.enabled to false?
>>> >>
>>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK 
>>> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > What would be the appropriate settings to run Spark with Kafka 10?
>>> My
>>> >> > job
>>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>>> >> > very
>>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>>> Kafka 10
>>> >> > . I
>>> >> > see the following error sometimes . Please see the kafka parameters
>>> and
>>> >> > the
>>> >> > consumer strategy for creating the stream below. Any suggestions on
>>> how
>>> >> > to
>>> >> > run this with better performance would be of great help.
>>> >> >
>>> >> > java.lang.AssertionError: assertion failed: Failed to get records
>>> for
>>> >> > test
>>> >> > stream1 72 324027964 after polling for 12
>>> >> >
>>> >> > val kafkaParams = Map[String, Object](
>>> >> >   "bootstrap.servers" -> kafkaBrokers,
>>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>>> >> >   "auto.offset.reset" -> "latest",
>>> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>>> >> >   "session.timeout.ms" -> Integer.valueOf(6),
>>> >> >   "request.timeout.ms" -> Integer.valueOf(9),
>>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>>> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>>> >> >   "group.id" -> "test1"
>>> >> > )
>>> >> >
>>> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
>>> String](
>>> >> > ssc,
>>> >> > LocationStrategies.PreferConsistent,
>>> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>>> >> > kafkaParams)
>>> >> >   )
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > View this message in context:
>>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-K
>>> afka-10-cluster-tp29108.html
>>> >> > Sent 

Re: Spark SQL vs HiveQL

2017-08-28 Thread Michael Artz
Thanks for responding BUT I would not be reading from a file if it was
Hive.

 I'm comparing Hive LLAP from a hive table vs Spark SQL from a file.  That
is the question.

Thanks

On Mon, Aug 28, 2017 at 1:58 PM, Imran Rajjad  wrote:

> If reading directly from file then Spark SQL should be your choice
>
>
> On Mon, Aug 28, 2017 at 10:25 PM Michael Artz 
> wrote:
>
>> Just to be clear, I'm referring to having Spark reading from a file, not
>> from a Hive table.  And it will have tungsten engine off heap serialization
>> after 2.1, so if it was a test with like 1.63 it won't be as helpful.
>>
>>
>> On Mon, Aug 28, 2017 at 10:50 AM, Michael Artz 
>> wrote:
>>
>>> Hi,
>>>   There isn't any good source to answer the question if Hive as an
>>> SQL-On-Hadoop engine just as fast as Spark SQL now? I just want to know if
>>> there has been a comparison done lately for HiveQL vs Spark SQL on Spark
>>> versions 2.1 or later.  I have a large ETL process, with many table joins
>>> and some string manipulation. I don't think anyone has done this kind of
>>> testing in a while.  With Hive LLAP being so performant, I am trying to
>>> make the case for using Spark and some of the architects are light on
>>> experience so they are scared of Scala.
>>>
>>> Thanks
>>>
>>>
>>>
>>
>>
>> --
> Sent from Gmail Mobile
>


Re: Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
I'm still unclear on if orderBy/groupBy + aggregates is a viable approach
or when one could rely on the last or first aggregate functions, but a
working alternative is to use window functions with row_number and a filter
kind of like this:

import spark.implicits._

val reverseOrdering = Seq("a", "b").map(col => df(col).desc)

val windowSpec = Window.partitionBy("group_id").orderBy(reverseOrdering:_*)

df.select("group_id",
  "row_id",
  sum("col_to_sum").over(windowSpec).as("total"),
  row_number().over(windowSpec).as("row_number"))
  .filter("row_number == 1")
  .select($"group_id",
  $"row_id".as("last_row_id"),
  $"total")

Would love to know if there's a better way!

On Mon, Aug 28, 2017 at 9:19 AM, Everett Anderson  wrote:

> Hi,
>
> I'm struggling a little with some unintuitive behavior with the Scala API.
> (Spark 2.0.2)
>
> I wrote something like
>
> df.orderBy("a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>last("row_id").as("last_row_id")))
>
> and expected a result with a unique group_id column, a column called
> "total" that's the sum of all col_to_sum in each group, and a column called
> "last_row_id" that's the last row_id seen in each group when the groups are
> sorted by columns a and b.
>
> However, the result is actually non-deterministic and changes based on the
> initial sorting and partitioning of df.
>
> I also tried
>
> df.orderBy("group_id", "a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>last("row_id").as("last_row_id")))
>
> thinking the problem might be that the groupBy does another shuffle that
> loses the ordering, but that also doesn't seem to work.
>
> Looking through the code
> ,
> both the Last and First aggregate functions have this comment:
>
> Even if [[Last]] is used on an already sorted column, if
> we do partial aggregation and final aggregation
> (when mergeExpression
> is used) its result will not be deterministic
> (unless the input table is sorted and has
> a single partition, and we use a single reducer to do the aggregation.).
>
>
> Some questions:
>
>1. What's the best way to take some values from the last row in an
>ordered group while performing some other aggregates over the entire group?
>
>2. Given these comments on last and first, when would these functions
>be useful? It would be rare to bring an entire Spark table to a single
>partition.
>
> Thanks!
>
>


Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-28 Thread Mikhailau, Alex
Thanks, Vadim. The issue is not access to logs. I am able to view them.

I have cloudwatch logs agent push logs to elasticsearch and then into Kibana 
using json-event-layout for log4j output. I would like to also log 
applicationId, executorId, etc in those log statements for clarity. Is there an 
MDC way with spark or something other than to achieve this?

Alex

From: Vadim Semenov 
Date: Monday, August 28, 2017 at 5:18 PM
To: "Mikhailau, Alex" 
Cc: "user@spark.apache.org" 
Subject: Re: Referencing YARN application id, YARN container hostname, Executor 
ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

When you create a EMR cluster you can specify a S3 path where logs will be 
saved after cluster, something like this:

s3://bucket/j-18ASDKLJLAKSD/containers/application_1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz

http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html

On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex 
> wrote:
Does anyone have a working solution for logging YARN application id, YARN 
container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 
5.7.0 in log statements? Are there specific ENV variables available or other 
workflow for doing that?

Thank you

Alex



Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-28 Thread Vadim Semenov
When you create a EMR cluster you can specify a S3 path where logs will be
saved after cluster, something like this:

s3://bucket/j-18ASDKLJLAKSD/containers/application_1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz

http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html

On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex 
wrote:

> Does anyone have a working solution for logging YARN application id, YARN
> container hostname, Executor ID and YARN attempt for jobs running on Spark
> EMR 5.7.0 in log statements? Are there specific ENV variables available or
> other workflow for doing that?
>
>
>
> Thank you
>
>
>
> Alex
>


Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-28 Thread Mikhailau, Alex
Does anyone have a working solution for logging YARN application id, YARN 
container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 
5.7.0 in log statements? Are there specific ENV variables available or other 
workflow for doing that?

Thank you

Alex


[Spark Streaming] Application is stopped after stopping a worker

2017-08-28 Thread Davide.Mandrini
I am running a spark streaming application on a cluster composed by three
nodes, each one with a worker and three executors (so a total of 9
executors). I am using the spark standalone mode.

The application is run with a spark-submit command with option --deploy-mode
client. The submit command is run from one of the nodes, let's call it node
1.

As a fault tolerance test I am stopping the worker on node 2 with the
command sudo service spark-worker stop.

In logs i can see that the Master keeps trying to run executors on the
shutting down worker (I can see thousands of tries, all with status FAILED,
for few seconds), and then the whole application is terminated by spark.

I tried to get more information about how spark handle worker failures but I
was not able to find any useful answer.

In spark source code I can see that the worker call for a driver kill when
we stop the worker: method onStop here
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
This might explain why the whole application is stopped eventually.

Is this the expected behavior in case of a worker explicitly stopped?

Is this a case of worker failure or it has to be considered differently (I
am explicitly shutting down the node here)?

Would it be the same behavior if the worker process was killed (and not
explicitly stopped)?

Thank you 
Davide



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Application-is-stopped-after-stopping-a-worker-tp29111.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
There is no difference in performance even with Cache being enabled.

On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> There is no difference in performance even with Cache being disabled.
>
> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger 
> wrote:
>
>> So if you can run with cache enabled for some time, does that
>> significantly affect the performance issue you were seeing?
>>
>> Those settings seem reasonable enough.   If preferred locations is
>> behaving correctly you shouldn't need cached consumers for all 96
>> partitions on any one executor, so that maxCapacity setting is
>> probably unnecessary.
>>
>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>  wrote:
>> > Because I saw some posts that say that consumer cache  enabled will have
>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>> > errors as well after running for sometime with cache being enabled. So,
>> I
>> > had to disable it. Please see the tickets below.  We have 96
>> partitions. So
>> > if I enable cache, would teh following settings help to improve
>> performance?
>> >
>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>> Integer.valueOf(96),
>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>> Integer.valueOf(96),
>> >
>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>> >
>> >
>> > http://markmail.org/message/n4cdxwurlhf44q5x
>> >
>> > https://issues.apache.org/jira/browse/SPARK-19185
>> >
>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger 
>> wrote:
>> >>
>> >> Why are you setting consumer.cache.enabled to false?
>> >>
>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK 
>> wrote:
>> >> > Hi,
>> >> >
>> >> > What would be the appropriate settings to run Spark with Kafka 10? My
>> >> > job
>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> >> > very
>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>> Kafka 10
>> >> > . I
>> >> > see the following error sometimes . Please see the kafka parameters
>> and
>> >> > the
>> >> > consumer strategy for creating the stream below. Any suggestions on
>> how
>> >> > to
>> >> > run this with better performance would be of great help.
>> >> >
>> >> > java.lang.AssertionError: assertion failed: Failed to get records for
>> >> > test
>> >> > stream1 72 324027964 after polling for 12
>> >> >
>> >> > val kafkaParams = Map[String, Object](
>> >> >   "bootstrap.servers" -> kafkaBrokers,
>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>> >> >   "auto.offset.reset" -> "latest",
>> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >> >   "group.id" -> "test1"
>> >> > )
>> >> >
>> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
>> String](
>> >> > ssc,
>> >> > LocationStrategies.PreferConsistent,
>> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> >> > kafkaParams)
>> >> >   )
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-
>> Kafka-10-cluster-tp29108.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >> >
>> >> > 
>> -
>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >
>> >
>> >
>>
>
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Patrick
ok . i see there is a describe() function which does the stat calculation
on dataset similar to StatCounter but however i dont want to restrict my
aggregations to standard mean, stddev etc and generate some custom stats ,
or also may not run all the predefined stats but only subset of them on the
particular column.
I was thinking if we need to write some custom code which does this in one
action(job) that would work for me



On Tue, Aug 29, 2017 at 12:02 AM, Georg Heiler 
wrote:

> Rdd only
> Patrick  schrieb am Mo. 28. Aug. 2017 um 20:13:
>
>> Ah, does it work with Dataset API or i need to convert it to RDD first ?
>>
>> On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler > > wrote:
>>
>>> What about the rdd stat counter? https://spark.apache.org/docs/
>>> 0.6.2/api/core/spark/util/StatCounter.html
>>>
>>> Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:
>>>
 Hi

 I have two lists:


- List one: contains names of columns on which I want to do
aggregate operations.
- List two: contains the aggregate operations on which I want to
perform on each column eg ( min, max, mean)

 I am trying to use spark 2.0 dataset to achieve this. Spark provides an
 agg() where you can pass a Map  (of column name and
 respective aggregate operation ) as input, however I want to perform
 different aggregation operations on the same column of the data and want to
 collect the result in a Map where key is the aggregate
 operation and Value is the result on the particular column.  If i add
 different agg() to same column, the key gets updated with latest value.

 Also I dont find any collectAsMap() operation that returns map of
 aggregated column name as key and result as value. I get collectAsList()
 but i dont know the order in which those agg() operations are run so how do
 i match which list values corresponds to which agg operation.  I am able to
 see the result using .show() but How can i collect the result in this case 
 ?

 Is it possible to do different aggregation on the same column in one
 Job(i.e only one collect operation) using agg() operation?


 Thanks in advance.


>>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Vadim Semenov
I didn't tailor it to your needs, but this is what I can offer you, the
idea should be pretty clear

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{collect_list, struct}

val spark: SparkSession

import spark.implicits._

case class Input(
  a: Int,
  b: Long,
  c: Int,
  d: Int,
  e: String
)

case class Output(
  a: Int,
  b: Long,
  data: Seq[(Int, Int, String)]
)

val df = spark.createDataFrame(Seq(
  Input(1, 1, 1, 1, "a"),
  Input(1, 1, 1, 1, "b"),
  Input(1, 1, 1, 1, "c"),
  Input(1, 2, 3, 3, "d")
))

val dfOut = df.groupBy("a", "b")
  .agg(collect_list(struct($"c", $"d", $"e")))
  .queryExecution.toRdd.mapPartitions(_.map(r => {
val a = r.getInt(0)
val b = r.getLong(1)

val list = r.getArray(2)

Output(
  a,
  b,
  (0 until list.numElements()).map(i => {
val struct = list.getStruct(i, 3)
val c = struct.getInt(0)
val d = struct.getInt(1)
val e = struct.getString(2)

(c, d, e)
  })
)
  })).toDF()
dfOut.explain(extended = true)
dfOut.show()


On Mon, Aug 28, 2017 at 10:47 AM, Patrick  wrote:

> Hi
>
> I have two lists:
>
>
>- List one: contains names of columns on which I want to do aggregate
>operations.
>- List two: contains the aggregate operations on which I want to
>perform on each column eg ( min, max, mean)
>
> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
> agg() where you can pass a Map  (of column name and
> respective aggregate operation ) as input, however I want to perform
> different aggregation operations on the same column of the data and want to
> collect the result in a Map where key is the aggregate
> operation and Value is the result on the particular column.  If i add
> different agg() to same column, the key gets updated with latest value.
>
> Also I dont find any collectAsMap() operation that returns map of
> aggregated column name as key and result as value. I get collectAsList()
> but i dont know the order in which those agg() operations are run so how do
> i match which list values corresponds to which agg operation.  I am able to
> see the result using .show() but How can i collect the result in this case ?
>
> Is it possible to do different aggregation on the same column in one
> Job(i.e only one collect operation) using agg() operation?
>
>
> Thanks in advance.
>
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Georg Heiler
Rdd only
Patrick  schrieb am Mo. 28. Aug. 2017 um 20:13:

> Ah, does it work with Dataset API or i need to convert it to RDD first ?
>
> On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler 
> wrote:
>
>> What about the rdd stat counter?
>> https://spark.apache.org/docs/0.6.2/api/core/spark/util/StatCounter.html
>>
>> Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:
>>
>>> Hi
>>>
>>> I have two lists:
>>>
>>>
>>>- List one: contains names of columns on which I want to do
>>>aggregate operations.
>>>- List two: contains the aggregate operations on which I want to
>>>perform on each column eg ( min, max, mean)
>>>
>>> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
>>> agg() where you can pass a Map  (of column name and
>>> respective aggregate operation ) as input, however I want to perform
>>> different aggregation operations on the same column of the data and want to
>>> collect the result in a Map where key is the aggregate
>>> operation and Value is the result on the particular column.  If i add
>>> different agg() to same column, the key gets updated with latest value.
>>>
>>> Also I dont find any collectAsMap() operation that returns map of
>>> aggregated column name as key and result as value. I get collectAsList()
>>> but i dont know the order in which those agg() operations are run so how do
>>> i match which list values corresponds to which agg operation.  I am able to
>>> see the result using .show() but How can i collect the result in this case ?
>>>
>>> Is it possible to do different aggregation on the same column in one
>>> Job(i.e only one collect operation) using agg() operation?
>>>
>>>
>>> Thanks in advance.
>>>
>>>
>


RE: from_json()

2017-08-28 Thread JG Perrin
Thanks Sam – this might be the solution. I will investigate!

From: Sam Elamin [mailto:hussam.ela...@gmail.com]
Sent: Monday, August 28, 2017 1:14 PM
To: JG Perrin 
Cc: user@spark.apache.org
Subject: Re: from_json()

Hi jg,

Perhaps I am misunderstanding you, but if you just want to create a new schema 
from a df its fairly simple, assuming you have a schema already predefined or 
in a string. i.e.

val newSchema = DataType.fromJson(json_schema_string)

then all you need to do is re-create the dataframe using this new dataframe

sqlContext.createDataFrame(oldDF.rdd,newSchema)
Regards
Sam

On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin 
> wrote:
Is there a way to not have to specify a schema when using from_json() or infer 
the schema? When you read a JSON doc from disk, you can infer the schema. 
Should I write it to disk before (ouch)?

jg


This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender. This 
information may contain confidential health information that is legally 
privileged. The information is intended only for the use of the individual or 
entity named above. The authorized recipient of this transmission is prohibited 
from disclosing this information to any other party unless required to do so by 
law or regulation and is required to delete or destroy the information after 
its stated need has been fulfilled. If you are not the intended recipient, you 
are hereby notified that any disclosure, copying, distribution or the taking of 
any action in reliance on or regarding the contents of this electronically 
transmitted information is strictly prohibited. If you have received this 
E-mail in error, please notify the sender and delete this message immediately.



Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread swetha kasireddy
There is no difference in performance even with Cache being disabled.

On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger  wrote:

> So if you can run with cache enabled for some time, does that
> significantly affect the performance issue you were seeing?
>
> Those settings seem reasonable enough.   If preferred locations is
> behaving correctly you shouldn't need cached consumers for all 96
> partitions on any one executor, so that maxCapacity setting is
> probably unnecessary.
>
> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>  wrote:
> > Because I saw some posts that say that consumer cache  enabled will have
> > concurrentModification exception with reduceByKeyAndWIndow. I see those
> > errors as well after running for sometime with cache being enabled. So, I
> > had to disable it. Please see the tickets below.  We have 96 partitions.
> So
> > if I enable cache, would teh following settings help to improve
> performance?
> >
> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
> Integer.valueOf(96),
> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
> Integer.valueOf(96),
> >
> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
> >
> >
> > http://markmail.org/message/n4cdxwurlhf44q5x
> >
> > https://issues.apache.org/jira/browse/SPARK-19185
> >
> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger 
> wrote:
> >>
> >> Why are you setting consumer.cache.enabled to false?
> >>
> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK  wrote:
> >> > Hi,
> >> >
> >> > What would be the appropriate settings to run Spark with Kafka 10? My
> >> > job
> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
> >> > very
> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka
> 10
> >> > . I
> >> > see the following error sometimes . Please see the kafka parameters
> and
> >> > the
> >> > consumer strategy for creating the stream below. Any suggestions on
> how
> >> > to
> >> > run this with better performance would be of great help.
> >> >
> >> > java.lang.AssertionError: assertion failed: Failed to get records for
> >> > test
> >> > stream1 72 324027964 after polling for 12
> >> >
> >> > val kafkaParams = Map[String, Object](
> >> >   "bootstrap.servers" -> kafkaBrokers,
> >> >   "key.deserializer" -> classOf[StringDeserializer],
> >> >   "value.deserializer" -> classOf[StringDeserializer],
> >> >   "auto.offset.reset" -> "latest",
> >> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
> >> >   "session.timeout.ms" -> Integer.valueOf(6),
> >> >   "request.timeout.ms" -> Integer.valueOf(9),
> >> >   "enable.auto.commit" -> (false: java.lang.Boolean),
> >> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> >> >   "group.id" -> "test1"
> >> > )
> >> >
> >> >   val hubbleStream = KafkaUtils.createDirectStream[String,
> String](
> >> > ssc,
> >> > LocationStrategies.PreferConsistent,
> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
> >> > kafkaParams)
> >> >   )
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > View this message in context:
> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-
> performance-while-running-Spark-Kafka-Direct-Streaming-
> with-Kafka-10-cluster-tp29108.html
> >> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >> >
> >> > -
> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >
> >
> >
>


Re: from_json()

2017-08-28 Thread Sam Elamin
Hi jg,

Perhaps I am misunderstanding you, but if you just want to create a new
schema from a df its fairly simple, assuming you have a schema already
predefined or in a string. i.e.

val newSchema = DataType.fromJson(json_schema_string)

then all you need to do is re-create the dataframe using this new dataframe

sqlContext.createDataFrame(oldDF.rdd,newSchema)

Regards
Sam

On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin  wrote:

> Is there a way to not have to specify a schema when using from_json() or
> infer the schema? When you read a JSON doc from disk, you can infer the
> schema. Should I write it to disk before (ouch)?
>
>
>
> jg
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Patrick
Ah, does it work with Dataset API or i need to convert it to RDD first ?

On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler 
wrote:

> What about the rdd stat counter? https://spark.apache.org/docs/
> 0.6.2/api/core/spark/util/StatCounter.html
>
> Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:
>
>> Hi
>>
>> I have two lists:
>>
>>
>>- List one: contains names of columns on which I want to do aggregate
>>operations.
>>- List two: contains the aggregate operations on which I want to
>>perform on each column eg ( min, max, mean)
>>
>> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
>> agg() where you can pass a Map  (of column name and
>> respective aggregate operation ) as input, however I want to perform
>> different aggregation operations on the same column of the data and want to
>> collect the result in a Map where key is the aggregate
>> operation and Value is the result on the particular column.  If i add
>> different agg() to same column, the key gets updated with latest value.
>>
>> Also I dont find any collectAsMap() operation that returns map of
>> aggregated column name as key and result as value. I get collectAsList()
>> but i dont know the order in which those agg() operations are run so how do
>> i match which list values corresponds to which agg operation.  I am able to
>> see the result using .show() but How can i collect the result in this case ?
>>
>> Is it possible to do different aggregation on the same column in one
>> Job(i.e only one collect operation) using agg() operation?
>>
>>
>> Thanks in advance.
>>
>>


Re: Spark SQL vs HiveQL

2017-08-28 Thread Imran Rajjad
If reading directly from file then Spark SQL should be your choice


On Mon, Aug 28, 2017 at 10:25 PM Michael Artz 
wrote:

> Just to be clear, I'm referring to having Spark reading from a file, not
> from a Hive table.  And it will have tungsten engine off heap serialization
> after 2.1, so if it was a test with like 1.63 it won't be as helpful.
>
>
> On Mon, Aug 28, 2017 at 10:50 AM, Michael Artz 
> wrote:
>
>> Hi,
>>   There isn't any good source to answer the question if Hive as an
>> SQL-On-Hadoop engine just as fast as Spark SQL now? I just want to know if
>> there has been a comparison done lately for HiveQL vs Spark SQL on Spark
>> versions 2.1 or later.  I have a large ETL process, with many table joins
>> and some string manipulation. I don't think anyone has done this kind of
>> testing in a while.  With Hive LLAP being so performant, I am trying to
>> make the case for using Spark and some of the architects are light on
>> experience so they are scared of Scala.
>>
>> Thanks
>>
>>
>>
>
>
> --
Sent from Gmail Mobile


Re: Spark SQL vs HiveQL

2017-08-28 Thread Michael Artz
Just to be clear, I'm referring to having Spark reading from a file, not
from a Hive table.  And it will have tungsten engine off heap serialization
after 2.1, so if it was a test with like 1.63 it won't be as helpful.


On Mon, Aug 28, 2017 at 10:50 AM, Michael Artz 
wrote:

> Hi,
>   There isn't any good source to answer the question if Hive as an
> SQL-On-Hadoop engine just as fast as Spark SQL now? I just want to know if
> there has been a comparison done lately for HiveQL vs Spark SQL on Spark
> versions 2.1 or later.  I have a large ETL process, with many table joins
> and some string manipulation. I don't think anyone has done this kind of
> testing in a while.  With Hive LLAP being so performant, I am trying to
> make the case for using Spark and some of the architects are light on
> experience so they are scared of Scala.
>
> Thanks
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Georg Heiler
What about the rdd stat counter?
https://spark.apache.org/docs/0.6.2/api/core/spark/util/StatCounter.html
Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:

> Hi
>
> I have two lists:
>
>
>- List one: contains names of columns on which I want to do aggregate
>operations.
>- List two: contains the aggregate operations on which I want to
>perform on each column eg ( min, max, mean)
>
> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
> agg() where you can pass a Map  (of column name and
> respective aggregate operation ) as input, however I want to perform
> different aggregation operations on the same column of the data and want to
> collect the result in a Map where key is the aggregate
> operation and Value is the result on the particular column.  If i add
> different agg() to same column, the key gets updated with latest value.
>
> Also I dont find any collectAsMap() operation that returns map of
> aggregated column name as key and result as value. I get collectAsList()
> but i dont know the order in which those agg() operations are run so how do
> i match which list values corresponds to which agg operation.  I am able to
> see the result using .show() but How can i collect the result in this case ?
>
> Is it possible to do different aggregation on the same column in one
> Job(i.e only one collect operation) using agg() operation?
>
>
> Thanks in advance.
>
>


from_json()

2017-08-28 Thread JG Perrin
Is there a way to not have to specify a schema when using from_json() or infer 
the schema? When you read a JSON doc from disk, you can infer the schema. 
Should I write it to disk before (ouch)?

jg

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


Re: apache thrift server

2017-08-28 Thread MidwestMike
I had some similar problems with the Simba driver before.   I was using the
ODBC one, but make sure your config looks like this page. 

https://docs.datastax.com/en/datastax_enterprise/5.0/datastax_enterprise/spark/simbaOdbcDriverConfigWindows.html

Notice selecting the authorization mechanism of "user" but leaving the user
name blank (?!)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/apache-thrift-server-tp29049p29110.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
Hi,

I'm struggling a little with some unintuitive behavior with the Scala API.
(Spark 2.0.2)

I wrote something like

df.orderBy("a", "b")
  .groupBy("group_id")
  .agg(sum("col_to_sum").as("total"),
   last("row_id").as("last_row_id")))

and expected a result with a unique group_id column, a column called
"total" that's the sum of all col_to_sum in each group, and a column called
"last_row_id" that's the last row_id seen in each group when the groups are
sorted by columns a and b.

However, the result is actually non-deterministic and changes based on the
initial sorting and partitioning of df.

I also tried

df.orderBy("group_id", "a", "b")
  .groupBy("group_id")
  .agg(sum("col_to_sum").as("total"),
   last("row_id").as("last_row_id")))

thinking the problem might be that the groupBy does another shuffle that
loses the ordering, but that also doesn't seem to work.

Looking through the code
,
both the Last and First aggregate functions have this comment:

Even if [[Last]] is used on an already sorted column, if
we do partial aggregation and final aggregation
(when mergeExpression
is used) its result will not be deterministic
(unless the input table is sorted and has
a single partition, and we use a single reducer to do the aggregation.).


Some questions:

   1. What's the best way to take some values from the last row in an
   ordered group while performing some other aggregates over the entire group?

   2. Given these comments on last and first, when would these functions be
   useful? It would be rare to bring an entire Spark table to a single
   partition.

Thanks!


RE: add me to email list

2017-08-28 Thread JG Perrin
Hey Mike,

You need to do it yourself, it’s really easy: 
http://spark.apache.org/community.html.

hih

jg

From: Michael Artz [mailto:michaelea...@gmail.com]
Sent: Monday, August 28, 2017 7:43 AM
To: user@spark.apache.org
Subject: add me to email list

Hi,
  Please add me to the email list
Mike

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


Spark SQL vs HiveQL

2017-08-28 Thread Michael Artz
Hi,
  There isn't any good source to answer the question if Hive as an
SQL-On-Hadoop engine just as fast as Spark SQL now? I just want to know if
there has been a comparison done lately for HiveQL vs Spark SQL on Spark
versions 2.1 or later.  I have a large ETL process, with many table joins
and some string manipulation. I don't think anyone has done this kind of
testing in a while.  With Hive LLAP being so performant, I am trying to
make the case for using Spark and some of the architects are light on
experience so they are scared of Scala.

Thanks


Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Patrick
Hi

I have two lists:


   - List one: contains names of columns on which I want to do aggregate
   operations.
   - List two: contains the aggregate operations on which I want to perform
   on each column eg ( min, max, mean)

I am trying to use spark 2.0 dataset to achieve this. Spark provides an
agg() where you can pass a Map  (of column name and
respective aggregate operation ) as input, however I want to perform
different aggregation operations on the same column of the data and want to
collect the result in a Map where key is the aggregate
operation and Value is the result on the particular column.  If i add
different agg() to same column, the key gets updated with latest value.

Also I dont find any collectAsMap() operation that returns map of
aggregated column name as key and result as value. I get collectAsList()
but i dont know the order in which those agg() operations are run so how do
i match which list values corresponds to which agg operation.  I am able to
see the result using .show() but How can i collect the result in this case ?

Is it possible to do different aggregation on the same column in one
Job(i.e only one collect operation) using agg() operation?


Thanks in advance.


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread Cody Koeninger
So if you can run with cache enabled for some time, does that
significantly affect the performance issue you were seeing?

Those settings seem reasonable enough.   If preferred locations is
behaving correctly you shouldn't need cached consumers for all 96
partitions on any one executor, so that maxCapacity setting is
probably unnecessary.

On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
 wrote:
> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger  wrote:
>>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK  wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> > job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> > very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
>> > . I
>> > see the following error sometimes . Please see the kafka parameters and
>> > the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> > to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> > test
>> > stream1 72 324027964 after polling for 12
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> kafkaBrokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "auto.offset.reset" -> "latest",
>> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >   "group.id" -> "test1"
>> > )
>> >
>> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> > ssc,
>> > LocationStrategies.PreferConsistent,
>> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> > kafkaParams)
>> >   )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Consumer Pre Fetch Messages + Async commits

2017-08-28 Thread Cody Koeninger
1. No, prefetched message offsets aren't exposed.

2. No, I'm not aware of any plans for sync commit, and I'm not sure
that makes sense.  You have to be able to deal with repeat messages in
the event of failure in any case, so the only difference sync commit
would make would be (possibly) slower run time.

On Sat, Aug 26, 2017 at 1:07 AM, Julia Wistance
 wrote:
> Hi Experts,
>
> A question on what could potentially happen with Spark Streaming 2.2.0 +
> Kafka. LocationStrategies says that "new Kafka consumer API will pre-fetch
> messages into buffers.".
> If we store offsets in Kafka, currently we can only use a async commits.
>
> So,
> 1 - Could it happen that we commit offsets that we havent processed yet but
> the kafka consumers has prefetched
> 2 - Are there plans to support a sync commit? Although we can go for an
> alternate store of commits like HBase / Zookeeper, MySQL etc the code would
> wait till the offsets are stored in either of these systems. It would make
> sense that Spark / Kafka also adds a sync commit option?
>
> Appreciate the reply.
> JW
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 2.2 structured streaming with mapGroupsWithState + window functions

2017-08-28 Thread daniel williams
Hi all,

I've been looking heavily into Spark 2.2 to solve a problem I have by
specifically using mapGroupsWithState.  What I've discovered is that a
*groupBy(window(..))* does not work when being used with a subsequent
*mapGroupsWithState* and produces an AnalysisException of :

*"mapGroupsWithState is not supported with aggregation on a streaming
DataFrame/Dataset;;"*

I have http logs that have been rolled up via a previous jobs window
function in the form of:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
"account": "A", "verb": "GET","statusCode": 500, "eventCount": 10}
{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:11"},
"account": "A", "verb": "GET","statusCode": 200, "eventCount": 89}

In this data the *when* sub-object is of one minute blocks.  I'd lock to
use a *window* function to aggregate that to 10 minute windows and sum the
eventCount by grouping on account, verb, and statusCode.  From there I'd
like to *mapGroupsWithState* for each *account* and *verb* to produce
buckets for some configurable window, say 10 minutes for example's sake, of
the form:

{"when": {"from": "2017-01-01T00:00:10", "to": "2017-01-01T00:00:20"},
"account": "A", "verb": "GET", "totalRequests": 999, "totalErrors": 198}

*mapGroupsWithState* is perfect for this but, as stated, I've not found a
way to apply a window function *and* use the mapsGroupsWithState.

Example:

ds.withColumn("bucket", $"when.from")

.withWatermark("bucket", "1 minutes")

.groupBy(window($"bucket", "10 minutes"), -- buckets and sums smaller
windowed events into a rolled up larger window event with summed eventCount

  $"account",

  $"verb",

  $"statusCode")

.agg(

  sum($"eventCount")

)

.map(r => Log())

.groupByKey(l => (l.when, l.account, l.verb)) -- maps

.mapGroupsWithState[SessionInfo, SessionUpdate](GroupStateTimeout -- will
calculate totalErrors / totalRequests per bucket

   .EventTimeTimeout()) {

   case ((when: Window, account: String, verb: String),

 events: Iterator[Log],

 state: GroupState[SessionInfo]) => {

..

  }
}


Any suggestions would be greatly appreciated.

I've also noticed that *groupByKey().reduceGroups()* does not work
with *mapGroupsWithState
*which is another strategy that I've tried.

Thanks.

dan


Predicate Pushdown Doesn't Work With Data Source API

2017-08-28 Thread Anton Puzanov
Hi everyone,

I am trying to improve the performance of data loading from disk. For that 
I have implemented my own RDD and now I am trying to increase the 
performance with predicate pushdown.
I have used many sources including the documentations and 
https://www.slideshare.net/databricks/yin-huai-20150325meetupwithdemos.

What I hope to achieve is accepting the filters on: `public RDD 
buildScan(String[] requiredColumns, Filter[] filters) ` and use them for 
filtering the data loaded to the dataframe. I have not implemented any 
Filters and from my understanding all the basic filters should be built in 
(eq, gt etc...).

The interesting part of my code:

public class MyProvider implements RelationProvider
{
@Override
public BaseRelation createRelation(SQLContext sqlContext, 
scala.collection.immutable.Map parameters) {
System.out.println("createRelation");
return new MylRelation(sqlContext, JavaConversions.mapAsJavaMap
(parameters));

}
}
public class MyRelation extends BaseRelation implements TableScan, 
PrunedScan, PrunedFilteredScan {
@Override
public StructType schema() {
return ExpressionEncoder.javaBean(EventBean.class).schema();
}

public RDD buildScan(String[] requiredColumns, Filter[] filters) {
System.out.println(Arrays.toString(requiredColumns));
System.out.println(Arrays.toString(filters));
}
}

of course I implemented the other 2 versions of buildScan. 
I use it in the following manner:

SQLContext sqc = sparkSession.sqlContext();
Dataset eventDataFrame = sqc.load("com.MyProvider", loadingOptions);
eventDataFrame.createOrReplaceTempView( "events" );
Dataset sqlResult = sparkSession.sql( query );
System.out.println(sqlResult.queryExecution().executedPlan().toString());

Where the query looks like this: "SELECT field1,field2,field3 from events 
WHERE field1>3"

The original Bean has 26 fields and the requiredColumns array has the 
relevant fields only which is good. The Filters array is empty regardless 
what I do.

I also tried:
SQLContext sqc = sparkSession.sqlContext();
Dataset eventDataFrame = sqc.load("com.myProvider", loadingOptions);
eventDataFrame.createOrReplaceTempView( "events" );
Dataset sqlResult = eventDataFrame.select("field1", "field2").filter(
"field1>4");

Another thing that bothers me is in the planning, I don't see the catalyst 
optimization printing, maybe it's not being optimized?
The planning looks like this:
*Filter (cast(field1#27 as int) > 4)
+- *Scan MyRelation @3a709cc7 [field1#27,field2#26] ReadSchema: 
struct

Thank you for your time and support.
Anton P.

Time window on Processing Time

2017-08-28 Thread madhu phatak
Hi,
As I am playing with structured streaming, I observed that window function
always requires a time column in input data.So that means it's event time.

Is it possible to old spark streaming style window function based on
processing time. I don't see any documentation on the same.

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


add me to email list

2017-08-28 Thread Michael Artz
Hi,
  Please add me to the email list
Mike


Re: Oracle Table not resolved [Spark 2.1.1]

2017-08-28 Thread Naga G
Not able to find the database name.
ora is the database in the below url ?

Sent from Naga iPad

> On Aug 28, 2017, at 4:06 AM, Imran Rajjad  wrote:
> 
> Hello,
> 
> I am trying to retrieve an oracle table into Dataset using following code
> 
> String url = "jdbc:oracle@localhost:1521:ora";
>   Dataset jdbcDF = spark.read()
>   .format("jdbc")
>   .option("driver", "oracle.jdbc.driver.OracleDriver")
>   .option("url", url)
>   .option("dbtable", "INCIDENTS")
>   .option("user", "user1")
>   .option("password", "pass1")
>   .load();
>   
>   System.out.println(jdbcDF.count());
> 
> below is the stack trace
> 
> java.lang.NullPointerException
>  at 
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:72)
>  at 
> org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:113)
>  at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
>  at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330)
>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
>  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
>  at com.elogic.hazelcast_test.JDBCTest.test(JDBCTest.java:56)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at 
> org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
>  at 
> org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
>  at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
>  at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
>  at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
>  at 
> org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
> 
> Apparently the connection is made but Table is not being detected. Any ideas 
> whats wrong with the code?
> 
> regards,
> Imran
> -- 
> I.R

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Oracle Table not resolved [Spark 2.1.1]

2017-08-28 Thread Imran Rajjad
Hello,

I am trying to retrieve an oracle table into Dataset using following
code

String url = "jdbc:oracle@localhost:1521:ora";
  Dataset jdbcDF = spark.read()
  .format("jdbc")
  .option("driver", "oracle.jdbc.driver.OracleDriver")
  .option("url", url)
  .option("dbtable", "INCIDENTS")
  .option("user", "user1")
  .option("password", "pass1")
  .load();

  System.out.println(jdbcDF.count());

below is the stack trace

java.lang.NullPointerException
 at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:72)
 at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:113)
 at
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
 at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
 at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
 at com.elogic.hazelcast_test.JDBCTest.test(JDBCTest.java:56)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
 at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
 at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
 at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
 at
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
 at
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
 at
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

Apparently the connection is made but Table is not being detected. Any
ideas whats wrong with the code?

regards,
Imran
-- 
I.R