Re: How this unit test passed on master trunk?

2016-04-23 Thread Zhan Zhang
There are multiple records for the DF

scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).show
+---+-+
|  a|min(struct(unresolvedstar()))|
+---+-+
|  1|[1,1]|
|  3|[3,1]|
|  2|[2,1]|

The meaning of .groupBy($"a").agg(min(struct($"record.*"))) is to get the min 
for all the records with the same $”a”

For example: TestData2(1,1) :: TestData2(1,2) The result would be 1, (1, 1), 
since struct(1, 1) is less than struct(1, 2). Please check how the Ordering is 
implemented in InterpretedOrdering.

The output itself does not have any ordering. I am not sure why the unit test 
and the real env have different environment.

Xiao,

I do see the difference between unit test and local cluster run. Do you know 
the reason?

Thanks.

Zhan Zhang




On Apr 22, 2016, at 11:23 AM, Yong Zhang 
> wrote:

Hi,

I was trying to find out why this unit test can pass in Spark code.

in
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

for this unit test:

  test("Star Expansion - CreateStruct and CreateArray") {
val structDf = testData2.select("a", "b").as("record")
// CreateStruct and CreateArray in aggregateExpressions
assert(structDf.groupBy($"a").agg(min(struct($"record.*"))).first() == 
Row(3, Row(3, 1)))
assert(structDf.groupBy($"a").agg(min(array($"record.*"))).first() == 
Row(3, Seq(3, 1)))

// CreateStruct and CreateArray in project list (unresolved alias)
assert(structDf.select(struct($"record.*")).first() == Row(Row(1, 1)))
assert(structDf.select(array($"record.*")).first().getAs[Seq[Int]](0) === 
Seq(1, 1))

// CreateStruct and CreateArray in project list (alias)
assert(structDf.select(struct($"record.*").as("a")).first() == Row(Row(1, 
1)))

assert(structDf.select(array($"record.*").as("a")).first().getAs[Seq[Int]](0) 
=== Seq(1, 1))
  }

>From my understanding, the data return in this case should be Row(1, Row(1, 
>1]), as that will be min of struct.

In fact, if I run the spark-shell on my laptop, and I got the result I expected:


./bin/spark-shell
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.0.0-SNAPSHOT
  /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala> case class TestData2(a: Int, b: Int)
defined class TestData2

scala> val testData2DF = sqlContext.sparkContext.parallelize(TestData2(1,1) :: 
TestData2(1,2) :: TestData2(2,1) :: TestData2(2,2) :: TestData2(3,1) :: 
TestData2(3,2) :: Nil, 2).toDF()

scala> val structDF = testData2DF.select("a","b").as("record")

scala> structDF.groupBy($"a").agg(min(struct($"record.*"))).first()
res0: org.apache.spark.sql.Row = [1,[1,1]]

scala> structDF.show
+---+---+
|  a|  b|
+---+---+
|  1|  1|
|  1|  2|
|  2|  1|
|  2|  2|
|  3|  1|
|  3|  2|
+---+---+

So from my spark, which I built on the master, I cannot get Row[3,[1,1]] back 
in this case. Why the unit test asserts that Row[3,[1,1]] should be the first, 
and it will pass? But I cannot reproduce that in my spark-shell? I am trying to 
understand how to interpret the meaning of "agg(min(struct($"record.*")))"


Thanks

Yong



Re: Spark writing to secure zone throws : UnknownCryptoProtocolVersionException

2016-04-23 Thread Ted Yu
Can you check that the DFSClient Spark uses is the same version as on the
server side ?

The client and server (NameNode) negotiate a "crypto protocol version" -
this is a forward-looking feature.
Please note:

bq. Client provided: []

Meaning client didn't provide any supported crypto protocol version.

Cheers

On Wed, Apr 20, 2016 at 3:27 AM, pierre lacave  wrote:

> Hi
>
>
> I am trying to use spark to write to a protected zone in hdfs, I am able to 
> create and list file using the hdfs client but when writing via Spark I get 
> this exception.
>
> I could not find any mention of CryptoProtocolVersion in the spark doc.
>
>
> Any idea what could have gone wrong?
>
>
> spark (1.5.0), hadoop (2.6.1)
>
>
> Thanks
>
>
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.UnknownCryptoProtocolVersionException):
>  No crypto protocol versions provided by the client are supported. Client 
> provided: [] NameNode supports: [CryptoProtocolVersion{description='Unknown', 
> version=1, unknownValue=null}, CryptoProtocolVersion{description='Encryption 
> zones', version=2, unknownValue=null}]
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.chooseProtocolVersion(FSNamesystem.java:2468)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2600)
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2520)
>   at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:579)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:394)
>   at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
>   at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2036)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
>   at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2034)
>
>   at org.apache.hadoop.ipc.Client.call(Client.java:1411)
>   at org.apache.hadoop.ipc.Client.call(Client.java:1364)
>   at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
>   at com.sun.proxy.$Proxy13.create(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:264)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>   at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>   at com.sun.proxy.$Proxy14.create(Unknown Source)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1612)
>   at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1488)
>   at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1413)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:387)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:383)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:383)
>   at 
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:327)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
>   at 
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
>   at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
>   at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at 

Re: Spark SQL Transaction

2016-04-23 Thread Andrés Ivaldi
Thanks, I'll take a look to JdbcUtils

regards.

On Sat, Apr 23, 2016 at 2:57 PM, Todd Nist  wrote:

> I believe the class you are looking for is
> org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala.
>
> By default in savePartition(...) , it will do the following:
>
> if (supportsTransactions) { conn.setAutoCommit(false) // Everything in
> the same db transaction. } Then at line 224, it will issue the commit:
> if (supportsTransactions) { conn.commit() } HTH -Todd
>
> On Sat, Apr 23, 2016 at 8:57 AM, Andrés Ivaldi  wrote:
>
>> Hello, so I executed Profiler and found that implicit isolation was turn
>> on by JDBC driver, this is the default behavior of MSSQL JDBC driver, but
>> it's possible change it with setAutoCommit method. There is no property for
>> that so I've to do it in the code, do you now where can I access to the
>> instance of JDBC class used by Spark on DataFrames?
>>
>> Regards.
>>
>> On Thu, Apr 21, 2016 at 10:59 AM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> This statement
>>>
>>> ."..each database statement is atomic and is itself a transaction.. your
>>> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
>>> ‘rollback’."
>>>
>>> MSSQL compiles with ACIDITY which requires that each transaction be "all
>>> or nothing": if one part of the transaction fails, then the entire
>>> transaction fails, and the database state is left unchanged.
>>>
>>> Assuming that it is one transaction (through much doubt if JDBC does
>>> that as it will take for ever), then either that transaction commits (in
>>> MSSQL redo + undo are combined in syslogs table of the database) meaning
>>> there will be undo + redo log generated  for that row only in syslogs. So
>>> under normal operation every RDBMS including MSSQL, Oracle, Sybase and
>>> others will comply with generating (redo and undo) and one cannot avoid it.
>>> If there is a batch transaction as I suspect in this case, it is either all
>>> or nothing. The thread owner indicated that rollback is happening so it is
>>> consistent with all rows rolled back.
>>>
>>> I don't think Spark, Sqoop, Hive can influence the transaction behaviour
>>> of an RDBMS for DML. DQ (data queries) do not generate transactions.
>>>
>>> HTH
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 21 April 2016 at 13:58, Michael Segel 
>>> wrote:
>>>
 Hi,

 Sometimes terms get muddled over time.

 If you’re not using transactions, then each database statement is
 atomic and is itself a transaction.
 So unless you have some explicit ‘Begin Work’ at the start…. your
 statements should be atomic and there will be no ‘redo’ or ‘commit’ or
 ‘rollback’.

 I don’t see anything in Spark’s documentation about transactions, so
 the statements should be atomic.  (I’m not a guru here so I could be
 missing something in Spark)

 If you’re seeing the connection drop unexpectedly and then a rollback,
 could this be a setting or configuration of the database?


 > On Apr 19, 2016, at 1:18 PM, Andrés Ivaldi 
 wrote:
 >
 > Hello, is possible to execute a SQL write without Transaction? we
 dont need transactions to save our data and this adds an overhead to the
 SQLServer.
 >
 > Regards.
 >
 > --
 > Ing. Ivaldi Andres


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>>
>> --
>> Ing. Ivaldi Andres
>>
>
>


-- 
Ing. Ivaldi Andres


Re: filtering the correct value in Spark streaming data from Kafka

2016-04-23 Thread Mich Talebzadeh
This seems to be a solution.

Just recall this is streaming prices coming in  consisting of three fields
as below:

ID   timestamp Signal
5145,20160424-000321,99.54898291795853400767


// I am only interested in the third field in the comma separated fields
which I call Signal  and when it is > 98.0

So I modified the code as follows

val lines = dstream.map(_._2)
// Interested in Signal field, third in the list
val words = lines.map(_.split(',').view(2))
val windowLength = 10
val slidingInterval = 10
val countByValueAndWindow = words.filter(_ >
"98.0").countByValueAndWindow(Seconds(windowLength),
Seconds(slidingInterval))
countByValueAndWindow.print()


Time: 146145348 ms
---
(99.96499387285531164956,2)
(98.09179374443103739485,3)
(98.43271813782244629781,1)
(99.02524930541705028542,3)
(99.40824915468534696789,2)
(98.71676655968832937025,1)
(98.34124261976762310917,1)
(98.10708435174291734263,1)
(99.90995946350894201227,1)
(99.76973488989534244967,1)

Anyway I was not aware that one can pickup a particular field in the RDD by
using view(n) where n =0, 1, 2 etc the position of the field

Let me know if this approach is sound.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com



On 23 April 2016 at 19:37, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Hi,
>
> I have Spark and Kafka streaming test for CEP signal.
>
> Pretty basic set up
>
> val ssc = new StreamingContext(conf, Seconds(10))
> ssc.checkpoint("checkpoint")
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
> "zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
> val topics = Set("newtopic")
> val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topics)
> dstream.cache()
> val lines = dstream.map(_._2)
> val words = lines.filter(_ > "90.0").flatMap(line =>
> line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)
> val windowLength = 10
> val slidingInterval = 10
> val countByValueAndWindow =
> words.countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
> countByValueAndWindow.print()
>
>
> This is format of data streaming in only three columns ID, Date and Signal
> separated by comma
>
> 97,20160423-182633,93.19871243745806169848
>
> So I want to pick up lines including Signal > "90.0" and discard the rest
>
> This is what I am getting from countByValueAndWindow.print()
>
> Time: 146143749 ms
> ---
> ((98,3),1)
> ((40.80441152620633003508,3),1)
> ((60.71243694664215996759,3),1)
> ((95,3),1)
> ((57.23635208501673894915,3),1)
> ((20160423-193322,27),1)
> ((46.97871858618538352181,3),1)
> ((68.92024376045110883977,3),1)
> ((96,3),1)
> ((91,3),1)
>
> I am only interesdted where the long number > 90" but obviously my
> selection is incorrect. How can I filter the correct value. This code line
> seems to be incorrect
>
> val words = lines.filter(_ > "90.0").flatMap(line =>
> line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)
>
> Any ideas appreciated
>
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Spark 2.0 Aggregator problems

2016-04-23 Thread Don Drake
I've downloaded a nightly build of Spark 2.0 (from today 4/23) and was
attempting to create an aggregator that will create a Seq[Rows], or
specifically a Seq[Class1], my custom class.

When I attempt to run the following code in a spark-shell, it errors out:

Gist: https://gist.github.com/dondrake/be6b92aff71433e9fb627b478b78b839

Code:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder,Row}
import org.apache.spark.sql.functions._
import java.util.Calendar

case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5:
Double)
val teams = sc.parallelize(Seq(
  C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2016-01-23"), 3253.21),
  C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88),
  C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"),
4322.12),
  C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"),
10283.72)
  )).toDS

//
https://docs.cloud.databricks.com/docs/spark/1.6/examples/Dataset%20Aggregator.html
object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]]  {
  def zero: Seq[C1] = null
  def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
  def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
  def finish(r: Seq[C1]): Seq[C1] = r

  override def bufferEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
  override def outputEncoder: Encoder[Seq[C1]] = ExpressionEncoder()
}

val g_c1 = teams.select(C1Agg.toColumn)


scala> val g_c1 = teams.select(C1Agg.toColumn)
scala.ScalaReflectionException: object $line37.$read not found.
  at
scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:162)
  at scala.reflect.internal.Mirrors$RootsBase.staticModule(Mirrors.scala:22)
  at $typecreator1$1.apply(:45)
  at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
  at
org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
  at
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
  at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:50)
  at
org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
  ... 52 elided

If I tweak my teams to be a DataFrame instead of a DataSet, and leave
everything else the same, I get a different error:

scala> val g_c1 = teams.select(C1Agg.toColumn)
org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate
[(C1Agg(unknown),mode=Complete,isDistinct=false) AS
c1agg(staticinvoke(class scala.collection.mutable.WrappedArray$,
ObjectType(interface scala.collection.Seq), make,
mapobjects(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27,
StructField(f1,StringType,true), StructField(f2,StringType,true),
StructField(f3,StringType,true), StructField(f4,DateType,true),
StructField(f5,DoubleType,false)), if
(isnull(lambdavariable(MapObjects_loopValue26, MapObjects_loopIsNull27,
StructField(f1,StringType,true), StructField(f2,StringType,true),
StructField(f3,StringType,true), StructField(f4,DateType,true),
StructField(f5,DoubleType,false null else newInstance(class C1),
upcast(value, ArrayType(StructType(StructField(f1,StringType,true),
StructField(f2,StringType,true), StructField(f3,StringType,true),
StructField(f4,DateType,true), StructField(f5,DoubleType,false)),true), -
root class: "scala.collection.Seq")).array, true))#63];
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:54)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:289)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:51)
  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:51)
  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:54)
  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:61)
  at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2443)
  at org.apache.spark.sql.Dataset.select(Dataset.scala:935)
  ... 52 elided

I'm not sure how to diagnose those errors.  Thoughts?

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


filtering the correct value in Spark streaming data from Kafka

2016-04-23 Thread Mich Talebzadeh
Hi,

I have Spark and Kafka streaming test for CEP signal.

Pretty basic set up

val ssc = new StreamingContext(conf, Seconds(10))
ssc.checkpoint("checkpoint")
val kafkaParams = Map[String, String]("bootstrap.servers" ->
"rhes564:9092", "schema.registry.url" -> "http://rhes564:8081;,
"zookeeper.connect" -> "rhes564:2181", "group.id" -> "StreamTest" )
val topics = Set("newtopic")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topics)
dstream.cache()
val lines = dstream.map(_._2)
val words = lines.filter(_ > "90.0").flatMap(line =>
line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)
val windowLength = 10
val slidingInterval = 10
val countByValueAndWindow =
words.countByValueAndWindow(Seconds(windowLength), Seconds(slidingInterval))
countByValueAndWindow.print()


This is format of data streaming in only three columns ID, Date and Signal
separated by comma

97,20160423-182633,93.19871243745806169848

So I want to pick up lines including Signal > "90.0" and discard the rest

This is what I am getting from countByValueAndWindow.print()

Time: 146143749 ms
---
((98,3),1)
((40.80441152620633003508,3),1)
((60.71243694664215996759,3),1)
((95,3),1)
((57.23635208501673894915,3),1)
((20160423-193322,27),1)
((46.97871858618538352181,3),1)
((68.92024376045110883977,3),1)
((96,3),1)
((91,3),1)

I am only interesdted where the long number > 90" but obviously my
selection is incorrect. How can I filter the correct value. This code line
seems to be incorrect

val words = lines.filter(_ > "90.0").flatMap(line =>
line.split(",")).map(word => (word, 3)).reduceByKey((x:Int, y:Int) => x+y)

Any ideas appreciated







Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


Re: Spark SQL Transaction

2016-04-23 Thread Todd Nist
I believe the class you are looking for is
org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala.

By default in savePartition(...) , it will do the following:

if (supportsTransactions) { conn.setAutoCommit(false) // Everything in the
same db transaction. } Then at line 224, it will issue the commit:
if (supportsTransactions) { conn.commit() } HTH -Todd

On Sat, Apr 23, 2016 at 8:57 AM, Andrés Ivaldi  wrote:

> Hello, so I executed Profiler and found that implicit isolation was turn
> on by JDBC driver, this is the default behavior of MSSQL JDBC driver, but
> it's possible change it with setAutoCommit method. There is no property for
> that so I've to do it in the code, do you now where can I access to the
> instance of JDBC class used by Spark on DataFrames?
>
> Regards.
>
> On Thu, Apr 21, 2016 at 10:59 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> This statement
>>
>> ."..each database statement is atomic and is itself a transaction.. your
>> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
>> ‘rollback’."
>>
>> MSSQL compiles with ACIDITY which requires that each transaction be "all
>> or nothing": if one part of the transaction fails, then the entire
>> transaction fails, and the database state is left unchanged.
>>
>> Assuming that it is one transaction (through much doubt if JDBC does that
>> as it will take for ever), then either that transaction commits (in MSSQL
>> redo + undo are combined in syslogs table of the database) meaning
>> there will be undo + redo log generated  for that row only in syslogs. So
>> under normal operation every RDBMS including MSSQL, Oracle, Sybase and
>> others will comply with generating (redo and undo) and one cannot avoid it.
>> If there is a batch transaction as I suspect in this case, it is either all
>> or nothing. The thread owner indicated that rollback is happening so it is
>> consistent with all rows rolled back.
>>
>> I don't think Spark, Sqoop, Hive can influence the transaction behaviour
>> of an RDBMS for DML. DQ (data queries) do not generate transactions.
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 13:58, Michael Segel 
>> wrote:
>>
>>> Hi,
>>>
>>> Sometimes terms get muddled over time.
>>>
>>> If you’re not using transactions, then each database statement is atomic
>>> and is itself a transaction.
>>> So unless you have some explicit ‘Begin Work’ at the start…. your
>>> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
>>> ‘rollback’.
>>>
>>> I don’t see anything in Spark’s documentation about transactions, so the
>>> statements should be atomic.  (I’m not a guru here so I could be missing
>>> something in Spark)
>>>
>>> If you’re seeing the connection drop unexpectedly and then a rollback,
>>> could this be a setting or configuration of the database?
>>>
>>>
>>> > On Apr 19, 2016, at 1:18 PM, Andrés Ivaldi  wrote:
>>> >
>>> > Hello, is possible to execute a SQL write without Transaction? we dont
>>> need transactions to save our data and this adds an overhead to the
>>> SQLServer.
>>> >
>>> > Regards.
>>> >
>>> > --
>>> > Ing. Ivaldi Andres
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Ing. Ivaldi Andres
>


Re: Dataset aggregateByKey equivalent

2016-04-23 Thread Michael Armbrust
Have you looked at aggregators?

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html

On Fri, Apr 22, 2016 at 6:45 PM, Lee Becker  wrote:

> Is there a way to do aggregateByKey on Datasets the way one can on an RDD?
>
> Consider the following RDD code to build a set of KeyVals into a DataFrame
> containing a column with the KeyVals' keys and a column containing lists of
> KeyVals.  The end goal is to join it with collections which which will be
> similarly transformed.
>
> case class KeyVal(k: Int, v: Int)
>
>
> val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield KeyVal(i,j))
>
> // function for appending to list
> val addToList = (s: List[KeyVal], v: KeyVal) => s :+ v
>
> // function for merging two lists
> val addLists = (s: List[KeyVal], t: List[KeyVal]) => s++t
>
> val keyAndKeyVals = keyVals.map(kv=> (kv.k, kv))
> val keyAndNestedKeyVals = keyAndKeyVals.
>   aggregateByKey(List[KeyVal]())(addToList, addLists).
>   toDF("key", "keyvals")
> keyAndNestedKeyVals.show
>
>
> which produces:
>
> +---++
> |key| keyvals|
> +---++
> |  1|[[1,4], [1,5], [1...|
> |  2|[[2,4], [2,5], [2...|
> |  3|[[3,4], [3,5], [3...|
> +---++
>
> For a Dataset approach I tried the following to no avail:
>
> // Initialize as Dataset
> val keyVals = sc.parallelize(for (i <- 1 to 3; j <- 4 to 6) yield 
> KeyVal(i,j)).
>   toDS
>
> // Build key, keyVal mappings
> val keyValsByKey = keyVals.groupBy(kv=>(kv.k))
>
> case class NestedKeyVal(key: Int, keyvals: List[KeyVal])
>
> val convertToNested = (key: Int, keyValsIter: Iterator[KeyVal]) => 
> NestedKeyVal(key=key, keyvals=keyValsIter.toList)
>
> val keyValsNestedByKey = keyValsByKey.mapGroups((key,keyvals) => 
> convertToNested(key,keyvals))
> keyValsNestedByKey.show
>
>
> This and several other incantations using groupBy + mapGroups consistently
> gives me serialization problems.  Is this because the iterator can not be
> guaranteed across boundaries?
> Or is there some issue with what a Dataset can encode in the interim.
> What other ways might I approach this problem?
>
> Thanks,
> Lee
>
>


Re: Spark SQL Transaction

2016-04-23 Thread Mich Talebzadeh
In your JDBC connection you can do

conn.commit();

or conn.rollback()

Why don't insert your data into #table in MSSQL and from there do one
insert/select into the main table. That is from ETL. In that case your main
table will be protected. Either it will have full data or no data.

Also have you specified the max packet size in JDBC to load into MSSQL
table. That will improve the speed.

Try experimenting by creating a CSV type file and use bulk load with
autocommit say every 10,000 rows into MSSQL table. That will tell you if
there is any issue. Ask the DBA to provide you with max packet size etc.
There is another limitation which would be the size of transaction log in
MSSQL database getting full.

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 23 April 2016 at 13:57, Andrés Ivaldi  wrote:

> Hello, so I executed Profiler and found that implicit isolation was turn
> on by JDBC driver, this is the default behavior of MSSQL JDBC driver, but
> it's possible change it with setAutoCommit method. There is no property for
> that so I've to do it in the code, do you now where can I access to the
> instance of JDBC class used by Spark on DataFrames?
>
> Regards.
>
> On Thu, Apr 21, 2016 at 10:59 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> This statement
>>
>> ."..each database statement is atomic and is itself a transaction.. your
>> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
>> ‘rollback’."
>>
>> MSSQL compiles with ACIDITY which requires that each transaction be "all
>> or nothing": if one part of the transaction fails, then the entire
>> transaction fails, and the database state is left unchanged.
>>
>> Assuming that it is one transaction (through much doubt if JDBC does that
>> as it will take for ever), then either that transaction commits (in MSSQL
>> redo + undo are combined in syslogs table of the database) meaning
>> there will be undo + redo log generated  for that row only in syslogs. So
>> under normal operation every RDBMS including MSSQL, Oracle, Sybase and
>> others will comply with generating (redo and undo) and one cannot avoid it.
>> If there is a batch transaction as I suspect in this case, it is either all
>> or nothing. The thread owner indicated that rollback is happening so it is
>> consistent with all rows rolled back.
>>
>> I don't think Spark, Sqoop, Hive can influence the transaction behaviour
>> of an RDBMS for DML. DQ (data queries) do not generate transactions.
>>
>> HTH
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 21 April 2016 at 13:58, Michael Segel 
>> wrote:
>>
>>> Hi,
>>>
>>> Sometimes terms get muddled over time.
>>>
>>> If you’re not using transactions, then each database statement is atomic
>>> and is itself a transaction.
>>> So unless you have some explicit ‘Begin Work’ at the start…. your
>>> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
>>> ‘rollback’.
>>>
>>> I don’t see anything in Spark’s documentation about transactions, so the
>>> statements should be atomic.  (I’m not a guru here so I could be missing
>>> something in Spark)
>>>
>>> If you’re seeing the connection drop unexpectedly and then a rollback,
>>> could this be a setting or configuration of the database?
>>>
>>>
>>> > On Apr 19, 2016, at 1:18 PM, Andrés Ivaldi  wrote:
>>> >
>>> > Hello, is possible to execute a SQL write without Transaction? we dont
>>> need transactions to save our data and this adds an overhead to the
>>> SQLServer.
>>> >
>>> > Regards.
>>> >
>>> > --
>>> > Ing. Ivaldi Andres
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> Ing. Ivaldi Andres
>


Re: Spark SQL Transaction

2016-04-23 Thread Andrés Ivaldi
Hello, so I executed Profiler and found that implicit isolation was turn on
by JDBC driver, this is the default behavior of MSSQL JDBC driver, but it's
possible change it with setAutoCommit method. There is no property for that
so I've to do it in the code, do you now where can I access to the instance
of JDBC class used by Spark on DataFrames?

Regards.

On Thu, Apr 21, 2016 at 10:59 AM, Mich Talebzadeh  wrote:

> This statement
>
> ."..each database statement is atomic and is itself a transaction.. your
> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
> ‘rollback’."
>
> MSSQL compiles with ACIDITY which requires that each transaction be "all
> or nothing": if one part of the transaction fails, then the entire
> transaction fails, and the database state is left unchanged.
>
> Assuming that it is one transaction (through much doubt if JDBC does that
> as it will take for ever), then either that transaction commits (in MSSQL
> redo + undo are combined in syslogs table of the database) meaning
> there will be undo + redo log generated  for that row only in syslogs. So
> under normal operation every RDBMS including MSSQL, Oracle, Sybase and
> others will comply with generating (redo and undo) and one cannot avoid it.
> If there is a batch transaction as I suspect in this case, it is either all
> or nothing. The thread owner indicated that rollback is happening so it is
> consistent with all rows rolled back.
>
> I don't think Spark, Sqoop, Hive can influence the transaction behaviour
> of an RDBMS for DML. DQ (data queries) do not generate transactions.
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 21 April 2016 at 13:58, Michael Segel 
> wrote:
>
>> Hi,
>>
>> Sometimes terms get muddled over time.
>>
>> If you’re not using transactions, then each database statement is atomic
>> and is itself a transaction.
>> So unless you have some explicit ‘Begin Work’ at the start…. your
>> statements should be atomic and there will be no ‘redo’ or ‘commit’ or
>> ‘rollback’.
>>
>> I don’t see anything in Spark’s documentation about transactions, so the
>> statements should be atomic.  (I’m not a guru here so I could be missing
>> something in Spark)
>>
>> If you’re seeing the connection drop unexpectedly and then a rollback,
>> could this be a setting or configuration of the database?
>>
>>
>> > On Apr 19, 2016, at 1:18 PM, Andrés Ivaldi  wrote:
>> >
>> > Hello, is possible to execute a SQL write without Transaction? we dont
>> need transactions to save our data and this adds an overhead to the
>> SQLServer.
>> >
>> > Regards.
>> >
>> > --
>> > Ing. Ivaldi Andres
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Ing. Ivaldi Andres