Re: ordered ingestion not guaranteed

2018-05-11 Thread Jörn Franke
What DB do you have? 

You have some options, such as
1) use a key value store (they can be accessed very efficiently) to see if 
there has been a newer key already processed - if yes then ignore value if no 
then insert into database
2) redesign the key to include the timestamp and find out the latest one when 
querying the database 

> On 11. May 2018, at 23:25, ravidspark  wrote:
> 
> Hi All,
> 
> I am using Spark 2.2.0 & I have below use case:
> 
> *Reading from Kafka using Spark Streaming and updating(not just inserting)
> the records into downstream database*
> 
> I understand that the way Spark read messages from Kafka will not be in a
> order of timestamp as stored in Kafka partitions rather, in the order of
> offsets of the partitions. So, for suppose if there are two messages in
> kafka with the same key but one message with timestamp which is latest and
> is placed in the smallest offset, one more message with oldest timestamp
> placed in at earliest offset. In this case, as Spark reads from smallest ->
> earliest offset, the latest timestamp will be processed first and then
> oldest timestamp resulting in an unordered ingestion into the DB.
> 
> If both these messages fell into the same rdd, then applying a reduce
> function we can ignore the message with oldest timestamp and process the
> latest timestamp message. But, I am not quite sure how to handle if these
> messages fall into different RDD's in the stream. An approach I was trying
> is to hit the DB and retrieve the timestamp in DB for that key and compare
> and ignore if old timestamp. But, this is not an efficient way when handling
> millions of messages as DB handling is expensive.
> 
> Is there a better way of solving this problem?
> 
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



ordered ingestion not guaranteed

2018-05-11 Thread ravidspark
Hi All,

I am using Spark 2.2.0 & I have below use case:

*Reading from Kafka using Spark Streaming and updating(not just inserting)
the records into downstream database*

I understand that the way Spark read messages from Kafka will not be in a
order of timestamp as stored in Kafka partitions rather, in the order of
offsets of the partitions. So, for suppose if there are two messages in
kafka with the same key but one message with timestamp which is latest and
is placed in the smallest offset, one more message with oldest timestamp
placed in at earliest offset. In this case, as Spark reads from smallest ->
earliest offset, the latest timestamp will be processed first and then
oldest timestamp resulting in an unordered ingestion into the DB.

If both these messages fell into the same rdd, then applying a reduce
function we can ignore the message with oldest timestamp and process the
latest timestamp message. But, I am not quite sure how to handle if these
messages fall into different RDD's in the stream. An approach I was trying
is to hit the DB and retrieve the timestamp in DB for that key and compare
and ignore if old timestamp. But, this is not an efficient way when handling
millions of messages as DB handling is expensive.

Is there a better way of solving this problem?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SPARK SQL: returns null for a column, while HIVE query returns data for the same column

2018-05-11 Thread ARAVIND ARUMUGHAM Sethurathnam
   - this column was added in later partitions and not present in earlier
   ones.
   -
   - i assume partition pruning should just load from that particular
   partition i am specifying when using spark sql ?
   -   (spark version 2.2)


On Fri, May 11, 2018 at 2:24 PM, ARAVIND ARUMUGHAM Sethurathnam <
arvind...@gmail.com> wrote:

> I have a hive table created on top of s3 DATA in parquet format and
> partitioned by one column named eventdate.
>
> 1) When using HIVE QUERY, it returns data for a column named "headertime"
> which is in the schema of BOTH the table and the file.
>
> select headertime from dbName.test_bug where eventdate=20180510 limit 10
>
> 2) FROM a scala NOTEBOOK , when directly loading a file from a particular
> partition that also works,
>
> val session = org.apache.spark.sql.SparkSession.builder
> .appName("searchRequests")
> .enableHiveSupport()
> .getOrCreate;
>
> val searchRequest = 
> session.sqlContext.read.parquet("s3n://bucketName/module/search_request/eventDate=20180510")
>
> searchRequest.createOrReplaceTempView("SearchRequest")
>
> val exploreDF = session.sql("select headertime from SearchRequest where 
> SearchRequestHeaderDate='2018-05-10' limit 100")
>
> exploreDF.show(20)
>
> this also displays the values for the column "headertime"
>
> 3) But, when using spark sql to query directly the HIVE table as below,
>
> val exploreDF = session.sql("select headertime from
>
> dbName.test_bug where eventdate=20180510 limit 100")
>
> exploreDF.show(20)
>
> it keeps returning null always.
>
> I opened the parquet file and see that the column headertime is present
> with values, but not sure why spark SQL is not able to read the values for
> that column.
>
> it will be helpful if someone can point out from where the spark SQL gets
> the schema? I was expecting it to behave similar to the HIVE QUERY
>
>


-- 
Wealth is not money. Wealth is relationships with people.


Re: Spark 2.3.0 Structured Streaming Kafka Timestamp

2018-05-11 Thread Michael Armbrust
Hmm yeah that does look wrong.  Would be great if someone opened a PR to
correct the docs :)

On Thu, May 10, 2018 at 5:13 PM Yuta Morisawa 
wrote:

> The problem is solved.
> The actual schema of Kafka message is different from documentation.
>
>
> https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
>
> The documentation says the format of "timestamp" column is Long type,
> but the actual format is timestamp.
>
>
> The followings are my code and result to check schema.
>
> -code
> val df = spark
>.read
>.format("kafka")
>.option("kafka.bootstrap.servers", bootstrapServers)
>.option(subscribeType, topics)
>.load()
>.printSchema()
>
> -result
> root
>   |-- key: binary (nullable = true)
>   |-- value: binary (nullable = true)
>   |-- topic: string (nullable = true)
>   |-- partition: integer (nullable = true)
>   |-- offset: long (nullable = true)
>   |-- timestamp: timestamp (nullable = true)
>   |-- timestampType: integer (nullable = true)
>
>
> Regards,
> Yuta
>
> On 2018/05/09 16:14, Yuta Morisawa wrote:
> > Hi All
> >
> > I'm trying to extract Kafka-timestamp from Kafka topics.
> >
> > The timestamp does not contain milli-seconds information,
> > but it should contain because ConsumerRecord class of Kafka 0.10
> > supports milli-second timestamp.
> >
> > How can I get milli-second timestamp from Kafka topics?
> >
> >
> > These are websites I refer to.
> >
> >
> https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html
> >
> >
> >
> https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/processor/TimestampExtractor.html
> >
> >
> >
> > And this is my code.
> > 
> > val df = spark
> >.readStream
> >.format("kafka")
> >.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
> >.option("subscribe", "topic1,topic2")
> >.load()
> >.selectExpr("CAST(timestamp AS LONG)", "CAST(value AS STRING)")
> >.as[(Long, String)]
> > 
> >
> > Regards,
> > Yuta
> >
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


SPARK SQL: returns null for a column, while HIVE query returns data for the same column

2018-05-11 Thread ARAVIND ARUMUGHAM Sethurathnam
I have a hive table created on top of s3 DATA in parquet format and
partitioned by one column named eventdate.

1) When using HIVE QUERY, it returns data for a column named "headertime"
which is in the schema of BOTH the table and the file.

select headertime from dbName.test_bug where eventdate=20180510 limit 10

2) FROM a scala NOTEBOOK , when directly loading a file from a particular
partition that also works,

val session = org.apache.spark.sql.SparkSession.builder
.appName("searchRequests")
.enableHiveSupport()
.getOrCreate;

val searchRequest =
session.sqlContext.read.parquet("s3n://bucketName/module/search_request/eventDate=20180510")

searchRequest.createOrReplaceTempView("SearchRequest")

val exploreDF = session.sql("select headertime from SearchRequest
where SearchRequestHeaderDate='2018-05-10' limit 100")

exploreDF.show(20)

this also displays the values for the column "headertime"

3) But, when using spark sql to query directly the HIVE table as below,

val exploreDF = session.sql("select headertime from

dbName.test_bug where eventdate=20180510 limit 100")

exploreDF.show(20)

it keeps returning null always.

I opened the parquet file and see that the column headertime is present
with values, but not sure why spark SQL is not able to read the values for
that column.

it will be helpful if someone can point out from where the spark SQL gets
the schema? I was expecting it to behave similar to the HIVE QUERY


Oozie with spark 2.3 in Kubernetes

2018-05-11 Thread purna pradeep
Hello,

Would like to know if anyone tried oozie with spark 2.3 actions on
Kubernetes for scheduling spark jobs .


Thanks,
Purna


UDTF registration fails for hiveEnabled SQLContext

2018-05-11 Thread Mick Davies
Hi,

If I try to register a UDTF using SQLContext ( with enableHiveSupport set)
using the code:

I get the following error:


It works OK if I use deprecated HiveContext. 


Is there a way to register UDTF without using deprecated code?

This is happening in some tests I am writing using 

but I don't think it is related to that.

Thanks for any help.

Mick






--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org