Kinesis streaming misunderstanding..?

2017-01-26 Thread Graham Clark
Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) to
read information from Kinesis and write it to HDFS in parquet format. The
write seems very slow, and if I understood Spark's diagnostics correctly,
always seemed to run from the same executor, one partition after the other,
serially. So I stripped the program down to this:


val kinesisStreams = (0 until numShards).map { i => {

  KinesisUtils.createStream(streamingContext, sparkApplicationName,

kinesisStreamName, kinesisUrl, awsRegion,
InitialPositionInStream.LATEST)

new Duration(streamingInterval.millis),
StorageLevel.MEMORY_AND_DISK_SER,

awsCredentials.accessKey, awsCredentials.secretKey)

}}

val allKinesisStreams = streamingContext.union(kinesisStreams)

allKinesisStreams.foreachRDD {

   rdd => {

  info("total for this batch is " + rdd.count())

   }
}

The Kinesis stream has 20 shards (overprovisioned for this small test). I
confirmed using a small boto program that data is periodically written to
all 20 of the shards. I can see that Spark has created 20 executors, one
for each Kinesis shard. It also creates one other executor, tied to a
particular worker node, and that node seems to do the RDD counting. The
streaming interval is 1 minute, during which time several shards have
received data. Each minute interval, for this particular example, the
driver prints out between 20 and 30 for the count value. I expected to see
the count operation parallelized across the cluster. I think I must just be
misunderstanding something fundamental! Can anyone point out where I'm
going wrong?

Yours in confusion,
Graham


Re: kafka structured streaming source refuses to read

2017-01-26 Thread Koert Kuipers
my little program prints out query.lastProgress every 10 seconds, and this
is what it shows:

{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:54:45.732Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 9,
"triggerExecution" : 10
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[wikipedia]]",
"startOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}
{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:54:55.745Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 5,
"triggerExecution" : 5
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[wikipedia]]",
"startOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}
{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:55:05.748Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 5,
"triggerExecution" : 5
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[wikipedia]]",
"startOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}
{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:55:15.758Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 4,
"triggerExecution" : 4
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[wikipedia]]",
"startOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}
{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:55:25.760Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 4,
"triggerExecution" : 4
  },
  "stateOperators" : [ ],
  "sources" : [ {
"description" : "KafkaSource[Subscribe[wikipedia]]",
"startOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"endOffset" : {
  "wikipedia" : {
"2" : 0,
"4" : 0,
"1" : 0,
"3" : 0,
"0" : 0
  }
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@
4818d2d9"
  }
}
{
  "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0",
  "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099",
  "name" : "wiki",
  "timestamp" : "2017-01-26T22:55:35.766Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
"getOffset" : 4,
"triggerExecution" : 4
  },
  "stateOperators" : [ 

Re: spark intermediate data fills up the disk

2017-01-26 Thread kanth909
Hi!

Yes these files are for shuffle blocks however they need to be cleaned as well 
right? I had been running a streaming application for 2 days. The third day my 
disk fills up with all .index and .data files and my assumption is that these 
files had been there since the start of my streaming application I should have 
checked the time stamp before doing rm -rf. Please let me know if I am wrong 

Sent from my iPhone

> On Jan 26, 2017, at 4:24 PM, Takeshi Yamamuro  wrote:
> 
> Yea, I think so and they are the intermediate files for shuffling. Probably, 
> kant checked the configuration here 
> (http://spark.apache.org/docs/latest/spark-standalone.html) though, this is 
> not related to the issue.
> 
> // maropu
> 
>> On Fri, Jan 27, 2017 at 7:46 AM, Jacek Laskowski  wrote:
>> Hi, 
>> 
>> The files are for shuffle blocks. Where did you find the docs about them? 
>> 
>> Jacek 
>> 
>> On 25 Jan 2017 8:41 p.m., "kant kodali"  wrote:
>> oh sorry its actually in the documentation. I should just set 
>> spark.worker.cleanup.enabled = true
>> 
>> On Wed, Jan 25, 2017 at 11:30 AM, kant kodali  wrote:
>>> I have bunch of .index and .data files like that fills up my disk. I am not 
>>> sure what the fix is? I am running spark 2.0.2 in stand alone mode
>>> 
>>> Thanks!
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro


Re: eager? in dataframe's checkpoint

2017-01-26 Thread Burak Yavuz
Hi,

One of the goals of checkpointing is to cut the RDD lineage. Otherwise you
run into StackOverflowExceptions. If you eagerly checkpoint, you basically
cut the lineage there, and the next operations all depend on the
checkpointed DataFrame. If you don't checkpoint, you continue to build the
lineage, therefore while that lineage is being resolved, you may hit the
StackOverflowException.

HTH,
Burak

On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin  wrote:

> Hey Sparkers,
>
> Trying to understand the Dataframe's checkpoint (*not* in the context of
> streaming) https://spark.apache.org/docs/latest/api/
> java/org/apache/spark/sql/Dataset.html#checkpoint(boolean)
>
> What is the goal of the *eager* flag?
>
> Thanks!
>
> jg
>


Re: spark narrow vs wide dependency

2017-01-26 Thread Shushant Arora
3.Also will the mappartitions can go out of memory if I return the
arraylist of whole partition after processing the partition ? whats the
alternative to this if this can fail.

On Fri, Jan 27, 2017 at 9:32 AM, Shushant Arora 
wrote:

> Hi
>
> I have two transformations in series.
>
> rdd1 = sourcerdd.map(new Function(...)); //step1
> rdd2 = rdd1.mapPartitions(new Function(...)); //step2
>
> 1.Is map and mapPartitions narrow dependency ? Does spark optimise the dag
> and execute step 1 and step2 in single stage or there will be two stages ?
>
> Bsically I have a requirement to use a complex object in step2 which I
> don't want to instantiate for each record so I have used mapPartitons at
> step 2.
>
> 2.If I have a requirement to instantiate a complex object across all tasks
>  on same executor node also , does making object singleton is fine there ?
> Since java discourages singleton , will it be fine here to use singleton or
> is there any other better alternative ?
>
> Thanks
>


spark narrow vs wide dependency

2017-01-26 Thread Shushant Arora
Hi

I have two transformations in series.

rdd1 = sourcerdd.map(new Function(...)); //step1
rdd2 = rdd1.mapPartitions(new Function(...)); //step2

1.Is map and mapPartitions narrow dependency ? Does spark optimise the dag
and execute step 1 and step2 in single stage or there will be two stages ?

Bsically I have a requirement to use a complex object in step2 which I
don't want to instantiate for each record so I have used mapPartitons at
step 2.

2.If I have a requirement to instantiate a complex object across all tasks
 on same executor node also , does making object singleton is fine there ?
Since java discourages singleton , will it be fine here to use singleton or
is there any other better alternative ?

Thanks


kafka structured streaming source refuses to read

2017-01-26 Thread Koert Kuipers
hey,
i am just getting started with kafka + spark structured streaming. so this
is probably a pretty dumb mistake.

i wrote a little program in spark to read messages from a kafka topic and
display them in the console, using the kafka source and console sink. i run
it it in spark local mode.

i hooked it up to a test topic that i send messages to using the kafka
console producer, and everything works great. i type a message in the
console producer, and it pops up in my spark program. very neat!

next i point it to another topic instead on which a kafka-connect program
is writing lots of irc messages. i can see kafka connect to the topic
successfully, the partitions are discovered etc., and then... nothing. it
just keeps stuck at offsets 0 for all partitions. at the same time in
another terminal i can see messages coming in just fine using the kafka
console consumer.

i dont get it. why doesnt kafka want to consume from this topic in spark
structured streaming?

thanks! koert


Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
Hi

I will do a little more testing and will let you know. It did not work with
INT and Number types, for sure.

While writing, everything is fine :)

On Fri, Jan 27, 2017 at 1:04 PM, Takeshi Yamamuro 
wrote:

> How about this?
> https://github.com/apache/spark/blob/master/sql/core/
> src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala#L729
> Or, how about using Double or something instead of Numeric?
>
> // maropu
>
> On Fri, Jan 27, 2017 at 10:25 AM, ayan guha  wrote:
>
>> Okay, it is working with varchar columns only. Is there any way to
>> workaround this?
>>
>> On Fri, Jan 27, 2017 at 12:22 PM, ayan guha  wrote:
>>
>>> hi
>>>
>>> I thought so too, so I created a table with INT and Varchar columns
>>>
>>> desc agtest1
>>>
>>> Name Null Type
>>>   -
>>> PID   NUMBER(38)
>>> DES   VARCHAR2(100)
>>>
>>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>>> table = "agtest1"
>>> user = "bal"
>>> password= "bal"
>>> driver="oracle.jdbc.OracleDriver"
>>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>>> user,"password":password,"driver":driver})
>>>
>>>
>>> Still the issue persists.
>>>
>>> On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro <
>>> linguin@gmail.com> wrote:
>>>
 Hi,

 I think you got this error because you used `NUMERIC` types in your
 schema (https://github.com/apache/spark/blob/master/sql/core/src/ma
 in/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So, IIUC
 avoiding the type is a workaround.

 // maropu


 On Fri, Jan 27, 2017 at 8:18 AM, ayan guha  wrote:

> Hi
>
> I am facing exact issue with Oracle/Exadataas mentioned here
> .
> Any idea? I could not figure out so sending to this grou hoping someone
> have see it (and solved it)
>
> Spark Version: 1.6
> pyspark command:
>
> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
> 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
> b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
> b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12
> .1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/bdc
> ell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/big
> datasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/bigda
> tasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/oracle/bigd
> atasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar   --conf
> spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracl
> e-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds
> /ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jl
> ib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib
> -bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
> bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bd
> s/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
> kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
> ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
> ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bd
> s/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/
> jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/
> bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar,/opt/
> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar,/opt/
> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar,/
> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar,/
> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
>
>
> Here is my code:
>
> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
> table = "HIST_FORECAST_NEXT_BILL_DGTL"
> user = "bal"
> password= "bal"
> driver="oracle.jdbc.OracleDriver"
> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
> user,"password":password,"driver":driver})
>
>
> Error:
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
> park/python/pyspark/sql/readwriter.py", line 289, in jdbc
> return self._df(self._jreader.jdbc(url, table, jprop))
>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
> park/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in
> __call__
>   File 

Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread Takeshi Yamamuro
How about this?
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala#L729
Or, how about using Double or something instead of Numeric?

// maropu

On Fri, Jan 27, 2017 at 10:25 AM, ayan guha  wrote:

> Okay, it is working with varchar columns only. Is there any way to
> workaround this?
>
> On Fri, Jan 27, 2017 at 12:22 PM, ayan guha  wrote:
>
>> hi
>>
>> I thought so too, so I created a table with INT and Varchar columns
>>
>> desc agtest1
>>
>> Name Null Type
>>   -
>> PID   NUMBER(38)
>> DES   VARCHAR2(100)
>>
>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>> table = "agtest1"
>> user = "bal"
>> password= "bal"
>> driver="oracle.jdbc.OracleDriver"
>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>> user,"password":password,"driver":driver})
>>
>>
>> Still the issue persists.
>>
>> On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro > > wrote:
>>
>>> Hi,
>>>
>>> I think you got this error because you used `NUMERIC` types in your
>>> schema (https://github.com/apache/spark/blob/master/sql/core/src/ma
>>> in/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So, IIUC
>>> avoiding the type is a workaround.
>>>
>>> // maropu
>>>
>>>
>>> On Fri, Jan 27, 2017 at 8:18 AM, ayan guha  wrote:
>>>
 Hi

 I am facing exact issue with Oracle/Exadataas mentioned here
 .
 Any idea? I could not figure out so sending to this grou hoping someone
 have see it (and solved it)

 Spark Version: 1.6
 pyspark command:

 pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
 b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
 b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12
 .1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/bdc
 ell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/bi
 gdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/bigd
 atasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/oracle
 /bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/oracle/
 bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/opt/
 oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/opt/
 oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar   --conf
 spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracl
 e-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds
 /ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/
 jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/
 jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/
 jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
 bds/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/jlib-
 bds/kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
 bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
 bds/ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
 bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-
 12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdata
 sql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar,/opt/
 oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/
 oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar,/
 opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar,/
 opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.
 jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
 oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
 bds/oraloader-orig.jar


 Here is my code:

 url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
 table = "HIST_FORECAST_NEXT_BILL_DGTL"
 user = "bal"
 password= "bal"
 driver="oracle.jdbc.OracleDriver"
 df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
 user,"password":password,"driver":driver})


 Error:
 Traceback (most recent call last):
   File "", line 1, in 
   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
 park/python/pyspark/sql/readwriter.py", line 289, in jdbc
 return self._df(self._jreader.jdbc(url, table, jprop))
   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
 park/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in
 __call__
   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
 park/python/pyspark/sql/utils.py", line 45, in deco
 return f(*a, **kw)
   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
 park/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in
 get_return_value
 py4j.protocol.Py4JJavaError: An error occurred while calling o40.jdbc.
 : 

Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
Okay, it is working with varchar columns only. Is there any way to
workaround this?

On Fri, Jan 27, 2017 at 12:22 PM, ayan guha  wrote:

> hi
>
> I thought so too, so I created a table with INT and Varchar columns
>
> desc agtest1
>
> Name Null Type
>   -
> PID   NUMBER(38)
> DES   VARCHAR2(100)
>
> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
> table = "agtest1"
> user = "bal"
> password= "bal"
> driver="oracle.jdbc.OracleDriver"
> df = sqlContext.read.jdbc(url=url,table=table,properties={"user"
> :user,"password":password,"driver":driver})
>
>
> Still the issue persists.
>
> On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> I think you got this error because you used `NUMERIC` types in your
>> schema (https://github.com/apache/spark/blob/master/sql/core/src/
>> main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So, IIUC
>> avoiding the type is a workaround.
>>
>> // maropu
>>
>>
>> On Fri, Jan 27, 2017 at 8:18 AM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> I am facing exact issue with Oracle/Exadataas mentioned here
>>> .
>>> Any idea? I could not figure out so sending to this grou hoping someone
>>> have see it (and solved it)
>>>
>>> Spark Version: 1.6
>>> pyspark command:
>>>
>>> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
>>> 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>> b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>> b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12
>>> .1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/
>>> bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/
>>> bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/
>>> bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/
>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/
>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/
>>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/
>>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar   --conf
>>> spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracl
>>> e-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>> bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/
>>> bdcell-12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/
>>> bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.
>>> jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>>> orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>> bds/orahivedp-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/orai18n.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>> jlib-bds/orai18n-orig.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-
>>> 12.1/jlib-bds/oraloader-orig.jar
>>>
>>>
>>> Here is my code:
>>>
>>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>>> table = "HIST_FORECAST_NEXT_BILL_DGTL"
>>> user = "bal"
>>> password= "bal"
>>> driver="oracle.jdbc.OracleDriver"
>>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>>> user,"password":password,"driver":driver})
>>>
>>>
>>> Error:
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>>> park/python/pyspark/sql/readwriter.py", line 289, in jdbc
>>> return self._df(self._jreader.jdbc(url, table, jprop))
>>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>>> park/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in
>>> __call__
>>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>>> park/python/pyspark/sql/utils.py", line 45, in deco
>>> return f(*a, **kw)
>>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>>> park/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in
>>> get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o40.jdbc.
>>> : java.util.NoSuchElementException: key not found: scale
>>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>>> at scala.collection.AbstractMap.default(Map.scala:58)
>>> at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>> at scala.collection.AbstractMap.apply(Map.scala:58)
>>> at 

Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
hi

I thought so too, so I created a table with INT and Varchar columns

desc agtest1

Name Null Type
  -
PID   NUMBER(38)
DES   VARCHAR2(100)

url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
table = "agtest1"
user = "bal"
password= "bal"
driver="oracle.jdbc.OracleDriver"
df =
sqlContext.read.jdbc(url=url,table=table,properties={"user":user,"password":password,"driver":driver})


Still the issue persists.

On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro 
wrote:

> Hi,
>
> I think you got this error because you used `NUMERIC` types in your schema
> (https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So,
> IIUC avoiding the type is a workaround.
>
> // maropu
>
>
> On Fri, Jan 27, 2017 at 8:18 AM, ayan guha  wrote:
>
>> Hi
>>
>> I am facing exact issue with Oracle/Exadataas mentioned here
>> .
>> Any idea? I could not figure out so sending to this grou hoping someone
>> have see it (and solved it)
>>
>> Spark Version: 1.6
>> pyspark command:
>>
>> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
>> 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>> b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>> b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/
>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/
>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/
>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/
>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.
>> jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>> oraloader.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
>>   --conf spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>> oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>> jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/
>> bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigd
>> atasql/bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/bigd
>> atasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/orai18n.jar/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/kvclient.jar,/opt/oracle/bigda
>> tasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdata
>> sql/bdcell-12.1/jlib-bds/ojdbc7-orig.jar,/opt/oracle/
>> bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar,/
>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-
>> common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-
>> hadoop-common-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>> jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>> jlib-bds/orahivedp-orig.jar,/opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/orai18n.jar,/opt/oracle/bigdatasql/bdcell-
>> 12.1/jlib-bds/orai18n-orig.jar,/opt/oracle/bigdatasql/
>> bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/
>> bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
>>
>>
>> Here is my code:
>>
>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>> table = "HIST_FORECAST_NEXT_BILL_DGTL"
>> user = "bal"
>> password= "bal"
>> driver="oracle.jdbc.OracleDriver"
>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>> user,"password":password,"driver":driver})
>>
>>
>> Error:
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/pyspark/sql/readwriter.py", line 289, in jdbc
>> return self._df(self._jreader.jdbc(url, table, jprop))
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in
>> __call__
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/pyspark/sql/utils.py", line 45, in deco
>> return f(*a, **kw)
>>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/s
>> park/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in
>> get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o40.jdbc.
>> : java.util.NoSuchElementException: key not found: scale
>> at scala.collection.MapLike$class.default(MapLike.scala:228)
>> at scala.collection.AbstractMap.default(Map.scala:58)
>> at scala.collection.MapLike$class.apply(MapLike.scala:141)
>> at scala.collection.AbstractMap.apply(Map.scala:58)
>> at org.apache.spark.sql.types.Metadata.get(Metadata.scala:108)
>> at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51)
>> at org.apache.spark.sql.jdbc.OracleDialect$.getCatalystType(
>> OracleDialect.scala:33)
>> at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.
>> resolveTable(JDBCRDD.scala:140)

Issue creating row with java.util.Map type

2017-01-26 Thread Ankur Srivastava
Hi,

I am trying to map a Dataset with rows which have a map attribute. When I
try to create a Row with the map attribute I get cast errors. I am able to
reproduce the issue with the below sample code. The surprising thing is
with same schema I am able to create a dataset from the List of rows.

I am on Spark 2.0 and scala 2.11

public static void main(String[] args) {
StructType schema = new StructType().add("src", DataTypes.StringType)
.add("dst", DataTypes.StringType)
.add("freq", DataTypes.createMapType(DataTypes.StringType,
DataTypes.IntegerType));
List inputData = new ArrayList<>();
inputData.add(RowFactory.create("1", "2", new HashMap<>()));
SparkSession sparkSession = SparkSession
.builder()
.appName("IPCountFilterTest")
.master("local")
.getOrCreate();

Dataset out = sparkSession.createDataFrame(inputData, schema);
out.show();

Encoder rowEncoder = RowEncoder.apply(schema);
out.map((MapFunction) row -> {
Row newRow = RowFactory.create(row.getString(0),
row.getString(1), new HashMap());

   //Row newRow = RowFactory.create(row.getString(0),
row.getString(1), row.getJavaMap(2));

return newRow;
}, rowEncoder).show();
}

Below is the error:

17/01/26 17:05:30 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: java.util.HashMap is not a valid external type
for schema of map
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/26 17:05:30 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
localhost): java.lang.RuntimeException: java.util.HashMap is not a valid
external type for schema of map
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Thanks
Ankur


Re: spark intermediate data fills up the disk

2017-01-26 Thread Takeshi Yamamuro
Yea, I think so and they are the intermediate files for shuffling.
Probably, kant checked the configuration here (
http://spark.apache.org/docs/latest/spark-standalone.html) though, this is
not related to the issue.

// maropu

On Fri, Jan 27, 2017 at 7:46 AM, Jacek Laskowski  wrote:

> Hi,
>
> The files are for shuffle blocks. Where did you find the docs about them?
>
> Jacek
>
> On 25 Jan 2017 8:41 p.m., "kant kodali"  wrote:
>
> oh sorry its actually in the documentation. I should just
> set spark.worker.cleanup.enabled = true
>
> On Wed, Jan 25, 2017 at 11:30 AM, kant kodali  wrote:
>
>> I have bunch of .index and .data files like that fills up my disk. I am
>> not sure what the fix is? I am running spark 2.0.2 in stand alone mode
>>
>> Thanks!
>>
>>
>>
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread Takeshi Yamamuro
Hi,

I think you got this error because you used `NUMERIC` types in your schema (
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32).
So, IIUC avoiding the type is a workaround.

// maropu


On Fri, Jan 27, 2017 at 8:18 AM, ayan guha  wrote:

> Hi
>
> I am facing exact issue with Oracle/Exadataas mentioned here
> .
> Any idea? I could not figure out so sending to this grou hoping someone
> have see it (and solved it)
>
> Spark Version: 1.6
> pyspark command:
>
> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-
> 12.1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/
> jlib-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/
> jlib-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/
> bdcell-12.1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.
> jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/
> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/
> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/
> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/
> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar   --conf
> spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-
> bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-
> 12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/kvclient.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/ojdbc7-orig.jar,/opt/
> oracle/bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.
> jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-
> hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
> bds/ora-hadoop-common-orig.jar,/opt/oracle/bigdatasql/
> bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/
> bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar,/opt/
> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar,/opt/
> oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar,/
> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar,/
> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
>
>
> Here is my code:
>
> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
> table = "HIST_FORECAST_NEXT_BILL_DGTL"
> user = "bal"
> password= "bal"
> driver="oracle.jdbc.OracleDriver"
> df = sqlContext.read.jdbc(url=url,table=table,properties={"user"
> :user,"password":password,"driver":driver})
>
>
> Error:
> Traceback (most recent call last):
>   File "", line 1, in 
>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/
> spark/python/pyspark/sql/readwriter.py", line 289, in jdbc
> return self._df(self._jreader.jdbc(url, table, jprop))
>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/
> spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in
> __call__
>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/
> spark/python/pyspark/sql/utils.py", line 45, in deco
> return f(*a, **kw)
>   File "/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/
> spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in
> get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o40.jdbc.
> : java.util.NoSuchElementException: key not found: scale
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:58)
> at scala.collection.MapLike$class.apply(MapLike.scala:141)
> at scala.collection.AbstractMap.apply(Map.scala:58)
> at org.apache.spark.sql.types.Metadata.get(Metadata.scala:108)
> at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51)
> at org.apache.spark.sql.jdbc.OracleDialect$.
> getCatalystType(OracleDialect.scala:33)
> at org.apache.spark.sql.execution.datasources.jdbc.
> JDBCRDD$.resolveTable(JDBCRDD.scala:140)
> at org.apache.spark.sql.execution.datasources.jdbc.
> JDBCRelation.(JDBCRelation.scala:91)
> at org.apache.spark.sql.DataFrameReader.jdbc(
> DataFrameReader.scala:222)
> at org.apache.spark.sql.DataFrameReader.jdbc(
> DataFrameReader.scala:146)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 

structured streaming 2.1.0 kafka driver --packages 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0' works on YARN but having trouble on standalone cluster mode

2017-01-26 Thread Heji Kim
Hello everyone,

Currently we are testing structured streaming kafka drivers. We submit on
YARN(2.7.3) with --packages
'org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0', without problems.
However when we try to launch on spark standalone with deploy mode=cluster,
we get the "ClassNotFoundException: Failed to find data source: kafka" error
even though the launch command has added the kafka jars to -Dspark.jars
 (see below) and subsequent log further states these jars have been
successfully added.

All 10 jars exist in /home/spark/.ivy2 on all nodes. I manually checked to
see that KafkaSourceProvider class does exist in the
org.apache.spark_spark-sql-kafka-0-10_2.11-2.1.0.jar.
I further confirmed there are no issues with the jars by launching the
driver in YARN without the --packages option and manually adding all 10
jars with  --jars option.
The nodes run Scala 2.11.8.

Any insight appreciated.

Thanking you in advance!
Heji

1) The automatically added jars by spark-submit:

-
Dspark.jars=file:/home/spark/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.1.0.jar,file:/home/spark/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar,file:/home/spark/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.1.0.jar,file:/home/spark/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:/home/spark/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar,file:/home/spark/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar,file:/home/spark/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar,file:/home/spark/.ivy2/jars/org.scalatest_scalatest_2.11-2.2.6.jar,file:/home/spark/.ivy2/jars/org.scala-lang_scala-reflect-2.11.8.jar,file:/home/spark/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar


Spark info messages which appears to have loaded these jars:

17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.1.0.jar
at 
spark://10.102.22.23:50513/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.1.0.jar
with timestamp 1485467844922
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar
at spark://10.102.22.23:50513/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar
with timestamp 1485467844923
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.apache.spark_spark-tags_2.11-2.1.0.jar
at spark://10.102.22.23:50513/jars/org.apache.spark_spark-tags_2.11-2.1.0.jar
with timestamp 1485467844923
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar
at spark://10.102.22.23:50513/jars/org.spark-project.spark_unused-1.0.0.jar
with timestamp 1485467844923
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar at
spark://10.102.22.23:50513/jars/net.jpountz.lz4_lz4-1.3.0.jar with
timestamp 1485467844923
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar
at spark://10.102.22.23:50513/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar
with timestamp 1485467844923
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar at
spark://10.102.22.23:50513/jars/org.slf4j_slf4j-api-1.7.16.jar with
timestamp 1485467844923
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.scalatest_scalatest_2.11-2.2.6.jar at
spark://10.102.22.23:50513/jars/org.scalatest_scalatest_2.11-2.2.6.jar
with timestamp 1485467844923
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.scala-lang_scala-reflect-2.11.8.jar at
spark://10.102.22.23:50513/jars/org.scala-lang_scala-reflect-2.11.8.jar
with timestamp 1485467844924
17/01/26 21:57:24 INFO SparkContext: Added JAR
file:/home/spark/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar
at 
spark://10.102.22.23:50513/jars/org.scala-lang.modules_scala-xml_2.11-1.0.2.jar
with timestamp 1485467844924


The error message:

Caused by: java.lang.ClassNotFoundException: Failed to find data
source: kafka. Please find packages at
http://spark.apache.org/third-party-projects.html
at 
org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:569)
at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
at 
org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:197)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at 
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at 
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at 
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at 

Re: How to reduce number of tasks and partitions in Spark job?

2017-01-26 Thread Jacek Laskowski
Repartition

Jacek

On 26 Jan 2017 6:13 p.m., "Md. Rezaul Karim" <
rezaul.ka...@insight-centre.org> wrote:

> Hi All,
>
> When I run a Spark job on my local machine (having 8 cores and 16GB of
> RAM) on an input data of 6.5GB, it creates 193 parallel tasks and put
> the output into 193 partitions.
>
> How can I change the number of tasks and consequently, the number of
> output files - say to just one or less?
>
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>


Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread ayan guha
Hi

I am facing exact issue with Oracle/Exadataas mentioned here
.
Any idea? I could not figure out so sending to this grou hoping someone
have see it (and solved it)

Spark Version: 1.6
pyspark command:

pyspark --driver-class-path
/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar
  --conf
spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar


Here is my code:

url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
table = "HIST_FORECAST_NEXT_BILL_DGTL"
user = "bal"
password= "bal"
driver="oracle.jdbc.OracleDriver"
df =
sqlContext.read.jdbc(url=url,table=table,properties={"user":user,"password":password,"driver":driver})


Error:
Traceback (most recent call last):
  File "", line 1, in 
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/pyspark/sql/readwriter.py",
line 289, in jdbc
return self._df(self._jreader.jdbc(url, table, jprop))
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
line 813, in __call__
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/pyspark/sql/utils.py",
line 45, in deco
return f(*a, **kw)
  File
"/opt/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p2001.2081/lib/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py",
line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o40.jdbc.
: java.util.NoSuchElementException: key not found: scale
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at org.apache.spark.sql.types.Metadata.get(Metadata.scala:108)
at org.apache.spark.sql.types.Metadata.getLong(Metadata.scala:51)
at
org.apache.spark.sql.jdbc.OracleDialect$.getCatalystType(OracleDialect.scala:33)
at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:140)
at
org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)
at
org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:222)
at
org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)



-- 
Best Regards,
Ayan Guha


Re: Cached table details

2017-01-26 Thread Jacek Laskowski
Hi,

I think that the only way to get the information about a cached RDD is to
use SparkListener and intercept respective events about cached blocks on
BlockManagers.

Jacek

On 25 Jan 2017 5:54 a.m., "kumar r"  wrote:

Hi,

I have cached some table in Spark Thrift Server. I want to get all cached
table information. I can see it in 4040 web ui port.

Is there any command or other way to get the cached table details
programmatically?

Thanks,
Kumar


Re: spark intermediate data fills up the disk

2017-01-26 Thread Jacek Laskowski
Hi,

The files are for shuffle blocks. Where did you find the docs about them?

Jacek

On 25 Jan 2017 8:41 p.m., "kant kodali"  wrote:

oh sorry its actually in the documentation. I should just
set spark.worker.cleanup.enabled = true

On Wed, Jan 25, 2017 at 11:30 AM, kant kodali  wrote:

> I have bunch of .index and .data files like that fills up my disk. I am
> not sure what the fix is? I am running spark 2.0.2 in stand alone mode
>
> Thanks!
>
>
>
>


Re: json_tuple fails to parse string with emoji

2017-01-26 Thread Andrew Ehrlich
It looks like I'm hitting this bug in jackson-core 2.2.3 which is included
in the version of CDH I'm on:
https://github.com/FasterXML/jackson-core/issues/115

Jackson-core 2.3.0 has the fix.

On Tue, Jan 24, 2017 at 5:14 PM, Andrew Ehrlich  wrote:

> On Spark 1.6.0, calling json_tuple() with an emoji character in one of the
> values returns nulls:
>
> Input:
> """
> "myJsonBody": {
>   "field1": ""
> }
> """
>
> Query:
> """
> ...
> LATERAL VIEW JSON_TUPLE(e.myJsonBody,'field1') k AS field1,
> ...
>
> """
>
> This looks like a platform-dependent issue; the parsing works fine on my
> local computer (OSX, 1.6.3) and fails on the remote cluster(Centos7, 1.6.0)
>
> I noticed that in 1.6.0, json_tuple was implemented this way:
> https://github.com/apache/spark/pull/7946/files
>
> So far I have:
>
>- Checked all java system properties related to charsets on drivers
>and executors
>- Turned up logging to debug level and checked for relevant messages
>
> Any more input? Should I try the dev mailing list?
>


RE: spark 2.02 error when writing to s3

2017-01-26 Thread VND Tremblay, Paul
This seems to have done the trick, although I am not positive. If I have time, 
I'll test spinning up a cluster with and without consistent view to pin point 
the error.

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Neil Jonkers [mailto:neilod...@gmail.com]
Sent: Friday, January 20, 2017 11:39 AM
To: Steve Loughran; VND Tremblay, Paul
Cc: Takeshi Yamamuro; user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Can you test by enabling emrfs consistent view and use s3:// uri.

http://docs.aws.amazon.com/emr/latest/ManagementGuide/enable-consistent-view.html

 Original message 
From: Steve Loughran
Date:20/01/2017 21:17 (GMT+02:00)
To: "VND Tremblay, Paul"
Cc: Takeshi Yamamuro ,user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

AWS S3 is eventually consistent: even after something is deleted, a LIST/GET 
call may show it. You may be seeing that effect; even after the DELETE has got 
rid of the files, a listing sees something there, And I suspect the time it 
takes for the listing to "go away" will depend on the total number of entries 
underneath, as there are more deletion markers "tombstones" to propagate around 
s3

Try deleting the path and then waiting a short period


On 20 Jan 2017, at 18:54, VND Tremblay, Paul 
> wrote:

I am using an EMR cluster, and the latest version offered is 2.02. The link 
below indicates that that user had the same problem, which seems unresolved.

Thanks

Paul

_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_


From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Thursday, January 19, 2017 9:27 PM
To: VND Tremblay, Paul
Cc: user@spark.apache.org
Subject: Re: spark 2.02 error when writing to s3

Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul 
> wrote:
I have come across a problem when writing CSV files to S3 in Spark 2.02. The 
problem does not exist in Spark 1.6.


19:09:20 Caused by: java.io.IOException: File already 
exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv


My code is this:

new_rdd\
135 .map(add_date_diff)\
136 .map(sid_offer_days)\
137 .groupByKey()\
138 .map(custom_sort)\
139 .map(before_rev_date)\
140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x, 
num_weeks))\
141 .toDF()\
142 .write.csv(
143 sep = "|",
144 header = True,
145 nullValue = '',
146 quote = None,
147 path = path
148 )

In order to get the path (the last argument), I call this function:

150 def _get_s3_write(test):
151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(), 
_get_s3_write_dir(test)):
152 s3_utility.remove_s3_dir(_get_write_bucket_name(), 
_get_s3_write_dir(test))
153 return make_s3_path(_get_write_bucket_name(), _get_s3_write_dir(test))

In other words, I am removing the directory if it exists before I write.

Notes:

* If I use a small set of data, then I don't get the error

* If I use Spark 1.6, I don't get the error

* If I read in a simple dataframe and then write to S3, I still get the error 
(without doing any transformations)

* If I do the previous step with a smaller set of data, I don't get the error.

* I am using pyspark, with python 2.7

* The thread at this link: 
https://forums.aws.amazon.com/thread.jspa?threadID=152470  Indicates the 
problem is caused by a problem sync problem. With large datasets, spark tries 
to write multiple times and causes the error. The suggestion is to turn off 
speculation, but I believe speculation is turned off by default in pyspark.

Thanks!

Paul


_

Paul Tremblay
Analytics Specialist

THE BOSTON CONSULTING GROUP
STL ▪

Tel. + ▪ Mobile +
tremblay.p...@bcg.com
_

Read BCG's latest insights, analysis, and viewpoints at 
bcgperspectives.com


RE: Ingesting Large csv File to relational database

2017-01-26 Thread VND Tremblay, Paul
What relational db are you using? We do this at work, and the way we handle it 
is to unload the db into Spark (actually, we unload it to S3 and then into 
Spark).  Redshift is very efficient at dumlping tables this way.



_

Paul Tremblay
Analytics Specialist
THE BOSTON CONSULTING GROUP
Tel. + ▪ Mobile +

_

From: Eric Dain [mailto:ericdai...@gmail.com]
Sent: Wednesday, January 25, 2017 11:14 PM
To: user@spark.apache.org
Subject: Ingesting Large csv File to relational database

Hi,

I need to write nightly job that ingest large csv files (~15GB each) and 
add/update/delete the changed rows to relational database.

If a row is identical to what in the database, I don't want to re-write the row 
to the database. Also, if same item comes from multiple sources (files) I need 
to implement a logic to choose if the new source is preferred or the current 
one in the database should be kept unchanged.

Obviously, I don't want to query the database for each item to check if the 
item has changed or no. I prefer to maintain the state inside Spark.

Is there a preferred and performant way to do that using Apache Spark ?

Best,
Eric

__
The Boston Consulting Group, Inc.
 
This e-mail message may contain confidential and/or privileged information.
If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.


Re: Dataframe fails to save to MySQL table in spark app, but succeeds in spark shell

2017-01-26 Thread Suresh Thalamati
I notice columns are quoted wit double quotes in the error message 
('"user","age","state”)) . By chance did you override the MySQL JDBC dialect,  
default MySQL identifiers are quoted with `
override def quoteIdentifier(colName: String): String = {
  s"`$colName`"
}
Just wondering if the error you are running into is related to quotes. 

Thanks
-suresh


> On Jan 26, 2017, at 1:28 AM, Didac Gil  wrote:
> 
> Are you sure that “age” is a numeric field?
> 
> Even numeric, you could pass the “44” between quotes: 
> 
> INSERT into your_table ("user","age","state") VALUES ('user3’,’44','CT’)
> 
> Are you sure there are no more fields that are specified as NOT NULL, and 
> that you did not provide a value (besides user, age and state)?
> 
> 
>> On 26 Jan 2017, at 04:42, Xuan Dzung Doan  
>> wrote:
>> 
>> Hi,
>> 
>> Spark version 2.1.0
>> MySQL community server version 5.7.17
>> MySQL Connector Java 5.1.40
>> 
>> I need to save a dataframe to a MySQL table. In spark shell, the following 
>> statement succeeds:
>> 
>> scala> df.write.mode(SaveMode.Append).format("jdbc").option("url", 
>> "jdbc:mysql://127.0.0.1:3306/mydb").option("dbtable", 
>> "person").option("user", "username").option("password", "password").save()
>> 
>> I write an app that basically does the same thing, issuing the same 
>> statement saving the same dataframe to the same MySQL table. I run it using 
>> spark-submit, but it fails, reporting some error in the SQL syntax. Here's 
>> the detailed stack trace:
>> 
>> 17/01/25 16:06:02 INFO DAGScheduler: Job 2 failed: save at 
>> DataIngestionJob.scala:119, took 0.159574 s
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
>> to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: 
>> Lost task 0.0 in stage 2.0 (TID 3, localhost, executor driver): 
>> java.sql.BatchUpdateException: You have an error in your SQL syntax; check 
>> the manual that corresponds to your MySQL server version for the right 
>> syntax to use near '"user","age","state") VALUES ('user3',44,'CT')' at line 1
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>  at 
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>>  at com.mysql.jdbc.Util.getInstance(Util.java:408)
>>  at 
>> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1162)
>>  at 
>> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1773)
>>  at 
>> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1257)
>>  at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:958)
>>  at 
>> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:597)
>>  at 
>> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
>>  at 
>> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
>>  at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
>>  at 
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You 
>> have an error in your SQL syntax; check the manual that corresponds to your 
>> MySQL server version for the right syntax to use near '"user","age","state") 
>> VALUES ('user3',44,'CT')' at line 1
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>  at 
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>>  at com.mysql.jdbc.Util.getInstance(Util.java:408)
>>  at 

eager? in dataframe's checkpoint

2017-01-26 Thread Jean Georges Perrin
Hey Sparkers,

Trying to understand the Dataframe's checkpoint (not in the context of 
streaming) 
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html#checkpoint(boolean)
 


What is the goal of the eager flag?

Thanks!

jg

Re: How to tune number of tesks

2017-01-26 Thread Md. Rezaul Karim
Hi,

If you require all the partitioned to be saved with saveAsTextFile you can
use coalesce(1,true).saveAsTextFile(). This basically means do the
computation then coalesce to only 1 partition. You can also use
repartition(1) too which is just a wrapper for the coalesce that sets the
shuffle argument as TRUE.

Val yourRDD = 
yourRDD.coalesce(1).saveAsTextFile("data/output")


Hope that helps.



Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 26 January 2017 at 16:21, Soheila S.  wrote:

> Hi all,
>
> Please tell me how can I tune output partition numbers.
> I run my spark job on my local machine with 8 cores and input data is
> 6.5GB. It creates 193 tasks and put the output into 193 partitions.
> How can I change the number of tasks and consequently, the number of
> output files?
>
> Best,
> Soheila
>


Re: Java heap error during matrix multiplication

2017-01-26 Thread Burak Yavuz
Hi,

Have you tried creating more column blocks?

BlockMatrix matrix = cmatrix.toBlockMatrix(100, 100);

for example.


Is your data randomly spread out, or do you generally have clusters of
data points together?


On Wed, Jan 25, 2017 at 4:23 AM, Petr Shestov  wrote:

> Hi all!
>
> I'm using Spark 2.0.1 with two workers (one executor each) with 20Gb each.
> And run following code:
>
> JavaRDD entries = ...; // filing the dataCoordinateMatrix 
> cmatrix = new CoordinateMatrix(entries.rdd());BlockMatrix matrix = 
> cmatrix.toBlockMatrix(100, 1000);BlockMatrix cooc = 
> matrix.transpose().multiply(matrix);
>
> My matrix is approx 8 000 000 x 3000, but only 10 000 000 cells have
> meaningful value. During multiplication I always get:
>
> 17/01/24 08:03:10 WARN TaskMemoryManager: leak 1322.6 MB memory from 
> org.apache.spark.util.collection.ExternalAppendOnlyMap@649e701917/01/24 
> 08:03:10 ERROR Executor: Exception in task 1.0 in stage 57.0 (TID 83664)
> java.lang.OutOfMemoryError: Java heap space
> at 
> org.apache.spark.mllib.linalg.DenseMatrix$.zeros(Matrices.scala:453)
> at 
> org.apache.spark.mllib.linalg.Matrix$class.multiply(Matrices.scala:101)
> at 
> org.apache.spark.mllib.linalg.SparseMatrix.multiply(Matrices.scala:565)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:483)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9$$anonfun$apply$11.apply(BlockMatrix.scala:480)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:480)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23$$anonfun$apply$9.apply(BlockMatrix.scala:479)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at 
> org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:115)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at 
> org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:30)
> at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at 
> org.apache.spark.util.collection.CompactBuffer.flatMap(CompactBuffer.scala:30)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:479)
> at 
> org.apache.spark.mllib.linalg.distributed.BlockMatrix$$anonfun$23.apply(BlockMatrix.scala:478)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
> Now I'm even trying to use only one core per executor. What can be the
> problem? And how can I debug it and find root cause? What could I miss in
> spark configuration?
>
> I've already tried increasing spark.default.parallelism and decreasing
> blocks size for BlockMatrix.
>
> Thanks.
>


How to reduce number of tasks and partitions in Spark job?

2017-01-26 Thread Md. Rezaul Karim
Hi All,

When I run a Spark job on my local machine (having 8 cores and 16GB of RAM)
on an input data of 6.5GB, it creates 193 parallel tasks and put
the output into 193 partitions.

How can I change the number of tasks and consequently, the number of output
files - say to just one or less?





Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Re: Issue returning Map from UDAF

2017-01-26 Thread Ankur Srivastava
Thank you Takeshi, that was the issue.

Thanks
Ankur

Sent from my iPhone

> On Jan 25, 2017, at 9:08 PM, Takeshi Yamamuro  wrote:
> 
> Hi,
> 
> Quickly looking around the attached, I found you wrongly passed the dataType
> of your aggregator output in line70.
> So, you need to at lease return `MapType` instead of `StructType`.
> The stacktrace you showed explicitly say this type unmatch.
> 
> // maropu
> 
> 
>> On Thu, Jan 26, 2017 at 12:07 PM, Ankur Srivastava 
>>  wrote:
>> Hi,
>> 
>> I have a dataset with tuple of ID and Timestamp. I want to do a group by on 
>> ID and then create a map with frequency per hour for the ID.
>> 
>> Input:
>> 1| 20160106061005
>> 1| 20160106061515
>> 1| 20160106064010
>> 1| 20160106050402
>> 1| 20160106040101
>> 2| 20160106040101
>> 3| 20160106051451
>> 
>> Expected Output:
>> 1|{2016010604:1, 2016010605:1, 2016010606:3}
>> 2|{2016010604:1}
>> 3|{2016010605:1}
>> 
>> As I could not find a function in org.apache.spark.sql.functions library 
>> that can do this aggregation I wrote a UDAF but when I execute it, it throws 
>> below exception.
>> 
>> I am using Dataset API from Spark 2.0 and am using Java library. Also 
>> attached is the code with the test data.
>> 
>> scala.MatchError: {2016010606=1} (of class 
>> scala.collection.convert.Wrappers$MapWrapper)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
>>  at 
>> org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
>>  at 
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228)
>>  at 
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220)
>>  at 
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
>>  at 
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:745)
>> 17/01/25 18:20:58 INFO Executor: Executor is trying to kill task 29.0 in 
>> stage 7.0 (TID 398)
>> 17/01/25 18:20:58 INFO DAGScheduler: ResultStage 7 (show at 
>> EdgeAggregator.java:29) failed in 0.699 s
>> 17/01/25 18:20:58 INFO DAGScheduler: Job 3 failed: show at 
>> EdgeAggregator.java:29, took 0.712912 s
>> In merge hr: 2016010606
>> 17/01/25 18:20:58 WARN TaskSetManager: Lost task 29.0 in stage 7.0 (TID 398, 
>> localhost): scala.MatchError: {2016010606=1} (of class 
>> scala.collection.convert.Wrappers$MapWrapper)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
>>  at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
>>  at 
>> org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
>>  at 
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228)
>>  at 
>> 

Re: can we plz open up encoder on dataset

2017-01-26 Thread Koert Kuipers
oh the map in DataFrame is actually using a RowEncoder. i left it out
because it wasn't important:

so this doesn't compile:

def f[T]: Dataset[T] => Dataset[T] = dataset => {
  val df = dataset.toDF
  df.map(row => row)(RowEncoder(df.schema)).as[T]
}


now this does compile. but i don't like it, since the assumption of an
implicit encoder isnt always true, and i dont want to start passing
encoders around when they are already embedded in the datasets:

def f[T: Encoder]: Dataset[T] => Dataset[T] = dataset => {
  val df = dataset.toDF
  df.map(row => row)(RowEncoder(df.schema)).as[T]
}

On Thu, Jan 26, 2017 at 10:50 AM, Jacek Laskowski  wrote:

> Hi Koert,
>
> map will take the value that has an implicit Encoder to any value that
> may or may not have an encoder in scope. That's why I'm asking about
> the map function to see what it does.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Jan 26, 2017 at 4:18 PM, Koert Kuipers  wrote:
> > the map operation works on DataFrame so it doesn't need an encoder. It
> could
> > have been any operation on DataFrame. the issue is at the end going back
> to
> > Dataset[T] using as[T]. this requires an encoder for T which i know i
> > already have since i started with a Dataset[T].
> >
> > i could add an implicit encoder but that assumes T has an implicit
> encoder
> > which isn't always true. for example i could be using a kryo encoder. but
> > anyhow i shouldn't have to be guessing this Encoder[T] since it's part
> of my
> > dataset already
> >
> > On Jan 26, 2017 05:18, "Jacek Laskowski"  wrote:
> >
> > Hi,
> >
> > Can you show the code from map to reproduce the issue? You can create
> > encoders using Encoders object (I'm using it all over the place for
> schema
> > generation).
> >
> > Jacek
> >
> > On 25 Jan 2017 10:19 p.m., "Koert Kuipers"  wrote:
> >>
> >> i often run into problems like this:
> >>
> >> i need to write a Dataset[T] => Dataset[T], and inside i need to switch
> to
> >> DataFrame for a particular operation.
> >>
> >> but if i do:
> >> dataset.toDF.map(...).as[T] i get error:
> >> Unable to find encoder for type stored in a Dataset.
> >>
> >> i know it has an encoder, because i started with Dataset[T]
> >>
> >> so i would like to do:
> >> dataset.toDF.map(...).as[T](dataset.encoder)
> >>
> >
>


How to tune number of tesks

2017-01-26 Thread Soheila S.
Hi all,

Please tell me how can I tune output partition numbers.
I run my spark job on my local machine with 8 cores and input data is
6.5GB. It creates 193 tasks and put the output into 193 partitions.
How can I change the number of tasks and consequently, the number of output
files?

Best,
Soheila


Re: can we plz open up encoder on dataset

2017-01-26 Thread Jacek Laskowski
Hi Koert,

map will take the value that has an implicit Encoder to any value that
may or may not have an encoder in scope. That's why I'm asking about
the map function to see what it does.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jan 26, 2017 at 4:18 PM, Koert Kuipers  wrote:
> the map operation works on DataFrame so it doesn't need an encoder. It could
> have been any operation on DataFrame. the issue is at the end going back to
> Dataset[T] using as[T]. this requires an encoder for T which i know i
> already have since i started with a Dataset[T].
>
> i could add an implicit encoder but that assumes T has an implicit encoder
> which isn't always true. for example i could be using a kryo encoder. but
> anyhow i shouldn't have to be guessing this Encoder[T] since it's part of my
> dataset already
>
> On Jan 26, 2017 05:18, "Jacek Laskowski"  wrote:
>
> Hi,
>
> Can you show the code from map to reproduce the issue? You can create
> encoders using Encoders object (I'm using it all over the place for schema
> generation).
>
> Jacek
>
> On 25 Jan 2017 10:19 p.m., "Koert Kuipers"  wrote:
>>
>> i often run into problems like this:
>>
>> i need to write a Dataset[T] => Dataset[T], and inside i need to switch to
>> DataFrame for a particular operation.
>>
>> but if i do:
>> dataset.toDF.map(...).as[T] i get error:
>> Unable to find encoder for type stored in a Dataset.
>>
>> i know it has an encoder, because i started with Dataset[T]
>>
>> so i would like to do:
>> dataset.toDF.map(...).as[T](dataset.encoder)
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [PySpark 2.1.0] - SparkContext not properly initialized by SparkConf

2017-01-26 Thread Sidney Feiner
I think I'm getting close to find the reason:

When I initialize the SparkContext, the following code is executed:
def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, 
serializer,
 conf, jsc, profiler_cls):
self.environment = environment or {}
# java gateway must have been launched at this point.
if conf is not None and conf._jconf is not None:
# conf has been initialized in JVM properly, so use conf directly. This 
represent the
# scenario that JVM has been launched before SparkConf is created (e.g. 
SparkContext is
# created and then stopped, and we create a new SparkConf and new 
SparkContext again)
   self._conf = conf
else:
self._conf = SparkConf(_jvm=SparkContext._jvm)

So I can see that the only way that my SparkConf will be used is if it also has 
a _jvm object.
I've used spark-submit to submit my job and printed the _jvm object but it is 
null, which explains why my SparkConf object is ignored.
I've tried running exactly the same on Spark 2.0.1 and it worked! My SparkConf 
object had a valid _jvm object.
Anybody knows what changed? Or if I got something wrong?

Thanks :)

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]

From: Sidney Feiner
Sent: Thursday, January 26, 2017 9:26 AM
To: user@spark.apache.org
Subject: [PySpark 2.1.0] - SparkContext not properly initialized by SparkConf

Hey, I'm pasting a question I asked on Stack Overflow without getting any 
answers(:()
I hope somebody here knows the answer, thanks in advance!
Link to 
post
I'm migrating from Spark 1.6 to 2.1.0 and I've run into a problem migrating my 
PySpark application.
I'm dynamically setting up my SparkConf object based on configurations in a 
file and when I was on Spark 1.6, the app would run with the correct configs. 
But now, when I open the Spark UI, I can see that NONE of those configs are 
loaded into the SparkContext. Here's my code:
spark_conf = SparkConf().setAll(
filter(lambda x: x[0].startswith('spark.'), conf_dict.items())
)
sc = SparkContext(conf=spark_conf)
I've also added a print before initializing the SparkContext to make sure the 
SparkConf has all the relevant configs:
[print("{0}: {1}".format(key, value)) for (key, value) in spark_conf.getAll()]
And this outputs all the configs I need:
* spark.app.name: MyApp
* spark.akka.threads: 4
* spark.driver.memory: 2G
* spark.streaming.receiver.maxRate: 25
* spark.streaming.backpressure.enabled: true
* spark.executor.logs.rolling.maxRetainedFiles: 7
* spark.executor.memory: 3G
* spark.cores.max: 24
* spark.executor.cores: 4
* spark.streaming.blockInterval: 350ms
* spark.memory.storageFraction: 0.2
* spark.memory.useLegacyMode: false
* spark.memory.fraction: 0.8
* spark.executor.logs.rolling.time.interval: daily
I submit my job with the following:
/usr/local/spark/bin/spark-submit --conf spark.driver.host=i-${HOSTNAME} 
--master spark://i-${HOSTNAME}:7077 /path/to/main/file.py /path/to/config/file
Does anybody know why my SparkContext doesn't get initialized with my SparkConf?
Thanks :)


Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]



Re: can we plz open up encoder on dataset

2017-01-26 Thread Koert Kuipers
the map operation works on DataFrame so it doesn't need an encoder. It
could have been any operation on DataFrame. the issue is at the end going
back to Dataset[T] using as[T]. this requires an encoder for T which i know
i already have since i started with a Dataset[T].

i could add an implicit encoder but that assumes T has an implicit encoder
which isn't always true. for example i could be using a kryo encoder. but
anyhow i shouldn't have to be guessing this Encoder[T] since it's part of
my dataset already

On Jan 26, 2017 05:18, "Jacek Laskowski"  wrote:

Hi,

Can you show the code from map to reproduce the issue? You can create
encoders using Encoders object (I'm using it all over the place for schema
generation).

Jacek

On 25 Jan 2017 10:19 p.m., "Koert Kuipers"  wrote:

> i often run into problems like this:
>
> i need to write a Dataset[T] => Dataset[T], and inside i need to switch to
> DataFrame for a particular operation.
>
> but if i do:
> dataset.toDF.map(...).as[T] i get error:
> Unable to find encoder for type stored in a Dataset.
>
> i know it has an encoder, because i started with Dataset[T]
>
> so i would like to do:
> dataset.toDF.map(...).as[T](dataset.encoder)
>
>


Re: is it possible to read .mdb file in spark

2017-01-26 Thread Richard Siebeling
Hi,

haven't used it, but Jackcess should do the trick >
http://jackcess.sourceforge.net/
kind regards,
Richard

2017-01-25 11:47 GMT+01:00 Selvam Raman :

>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


DML in Spark ETL

2017-01-26 Thread A Shaikh
In past we used ETL tool wherein ETL task which update, insert and delete
rows in target database tables(Oracle/Netezza). Sparks Dataset (and RDD)
has .save* method which can insert rows.

How to delete or update records in database table in Spark?


Re: can we plz open up encoder on dataset

2017-01-26 Thread Jacek Laskowski
Hi,

Can you show the code from map to reproduce the issue? You can create
encoders using Encoders object (I'm using it all over the place for schema
generation).

Jacek

On 25 Jan 2017 10:19 p.m., "Koert Kuipers"  wrote:

> i often run into problems like this:
>
> i need to write a Dataset[T] => Dataset[T], and inside i need to switch to
> DataFrame for a particular operation.
>
> but if i do:
> dataset.toDF.map(...).as[T] i get error:
> Unable to find encoder for type stored in a Dataset.
>
> i know it has an encoder, because i started with Dataset[T]
>
> so i would like to do:
> dataset.toDF.map(...).as[T](dataset.encoder)
>
>


Re: where is mapWithState executed?

2017-01-26 Thread Jacek Laskowski
Hi,

Shooting in the dark...it's executed on executors (it's old tech RDD-based
so not many extra optimizations like in Spark SQL now).

Can you show the code as I'm scared to hear that you're trying to broadcast
inside a transformation which I'd believe is impossible.

Jacek

On 26 Jan 2017 12:18 a.m., "shyla deshpande" 
wrote:

After more reading, I know the state is distributed across the cluster. But If
I need to lookup a map in the updatefunction, I need to broadcast it.

Just want to make sure I am on the right path.

Appreciate your help. Thanks

On Wed, Jan 25, 2017 at 2:33 PM, shyla deshpande 
wrote:

> Is it executed on the driver or executor.  If I need to lookup a map in
> the updatefunction, I need to broadcast it,  if mapWithState executed runs
> on executor.
>
> Thanks
>


Re: Dataframe fails to save to MySQL table in spark app, but succeeds in spark shell

2017-01-26 Thread Didac Gil
Are you sure that “age” is a numeric field?

Even numeric, you could pass the “44” between quotes: 

INSERT into your_table ("user","age","state") VALUES ('user3’,’44','CT’)

Are you sure there are no more fields that are specified as NOT NULL, and that 
you did not provide a value (besides user, age and state)?


> On 26 Jan 2017, at 04:42, Xuan Dzung Doan  
> wrote:
> 
> Hi,
> 
> Spark version 2.1.0
> MySQL community server version 5.7.17
> MySQL Connector Java 5.1.40
> 
> I need to save a dataframe to a MySQL table. In spark shell, the following 
> statement succeeds:
> 
> scala> df.write.mode(SaveMode.Append).format("jdbc").option("url", 
> "jdbc:mysql://127.0.0.1:3306/mydb").option("dbtable", 
> "person").option("user", "username").option("password", "password").save()
> 
> I write an app that basically does the same thing, issuing the same statement 
> saving the same dataframe to the same MySQL table. I run it using 
> spark-submit, but it fails, reporting some error in the SQL syntax. Here's 
> the detailed stack trace:
> 
> 17/01/25 16:06:02 INFO DAGScheduler: Job 2 failed: save at 
> DataIngestionJob.scala:119, took 0.159574 s
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: 
> Lost task 0.0 in stage 2.0 (TID 3, localhost, executor driver): 
> java.sql.BatchUpdateException: You have an error in your SQL syntax; check 
> the manual that corresponds to your MySQL server version for the right syntax 
> to use near '"user","age","state") VALUES ('user3',44,'CT')' at line 1
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>   at com.mysql.jdbc.Util.getInstance(Util.java:408)
>   at 
> com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1162)
>   at 
> com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1773)
>   at 
> com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1257)
>   at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:958)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:597)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
>   at 
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:670)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:99)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You 
> have an error in your SQL syntax; check the manual that corresponds to your 
> MySQL server version for the right syntax to use near '"user","age","state") 
> VALUES ('user3',44,'CT')' at line 1
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>   at com.mysql.jdbc.Util.getInstance(Util.java:408)
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:943)
>   at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3970)
>   at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3906)
>   at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524)
>   at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2677)
>   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549)
>   at 
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
>