How to work around NoOffsetForPartitionException when using Spark Streaming

2018-06-01 Thread Martin Peng
Hi,

We see below exception when using Spark Kafka streaming 0.10 on a normal
Kafka topic. Not sure why offset missing in zk, but since Spark streaming
override the offset reset policy to none in the code. I can not set the
reset policy to latest(I don't really care data loss now).

Is there any quick way to fix the missing offset or work around this?

Thanks,
Martin

1/06/2018 17:11:02: ERROR:the type of error is
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition:
elasticsearchtopicrealtimereports-97
01/06/2018 17:11:02: ERROR:Undefined offset with no reset policy for
partition: elasticsearchtopicrealtimereports-97
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:370)
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248)
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601)
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.immutable.List.map(List.scala:285)
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)

Help explaining explain() after DataFrame join reordering

2018-06-01 Thread Mohamed Nadjib MAMI
Dear Sparkers,

I'm loading into DataFrames data from 5 sources (using official
connectors): Parquet, MongoDB, Cassandra, MySQL and CSV. I'm then joining
those DataFrames in two different orders.
- mongo * cassandra * jdbc * parquet * csv (random order).
- parquet * csv * cassandra * jdbc * mongodb (optimized order).

The first follows a random order, whereas the second I'm deciding based on
some optimization techniques (can provide details for the interested
readers or if needed here).

After the evaluation on increasing sizes of data, the optimization
techniques I developed didn't improve the performance very noticeably. I
inspected the Logical/Physical plan of the final joined DataFrame (using
`explain(true)`). The 1st order was respected, whereas the 2nd order, it
turned out, wasn't respected, and MongoDB was queried first.

However, that what it seemed to me, I'm not quite confident reading the
Plans (returned using explain(true)). Could someone help explaining the
`explain(true)` output? (pasted in this gist
). Is
there a way we could enforce the given order?

I'm using Spark 2.1, so I think it doesn't include the new cost-based
optimizations (introduced in Spark 2.2).

*Regards, Grüße, **Cordialement,** Recuerdos, Saluti, προσρήσεις, 问候,
تحياتي.*
*Mohamed Nadjib Mami*
*Research Associate @ Fraunhofer IAIS - PhD Student @ Bonn University*
*About me! *
*LinkedIn *


Append In-Place to S3

2018-06-01 Thread Benjamin Kim
I have a situation where I trying to add only new rows to an existing data set 
that lives in S3 as gzipped parquet files, looping and appending for each hour 
of the day. First, I create a DF from the existing data, then I use a query to 
create another DF with the data that is new. Here is the code snippet.

df = spark.read.parquet(existing_data_path)
df.createOrReplaceTempView(‘existing_data’)
new_df = spark.read.parquet(new_data_path)
new_df.createOrReplaceTempView(’new_data’)
append_df = spark.sql(
"""
WITH ids AS (
SELECT DISTINCT
source,
source_id,
target,
target_id
FROM new_data i
LEFT ANTI JOIN existing_data im
ON i.source = im.source
AND i.source_id = im.source_id
AND i.target = im.target
AND i.target = im.target_id
"""
)
append_df.coalesce(1).write.parquet(existing_data_path, mode='append', 
compression='gzip’)

I thought this would append new rows and keep the data unique, but I am see 
many duplicates. Can someone help me with this and tell me what I am doing 
wrong?

Thanks,
Ben

Re: Spark structured streaming generate output path runtime

2018-06-01 Thread Lalwani, Jayesh
This will not work the way you have implemented it. The code that you have here 
will be called only once before the streaming query is started. Once the 
streaming query starts, this code is not called

What I would do is

  1.  Implement a udf that calculates flourtimestamp
  2.  Add a column in df2 called “Flourtimestamp”, and populate it with the 
flour timestamp using udf.
  3.  Write the stream using
Df2.writeStream.format(“text”).partitionBy(“flourtimestamp”).option(“path”, 
“/home/data”).option("checkpointLocation","./checkpoint").start()

The UDF will be called for every row. And partitionBy will create a folder 
within /home/data


From: Swapnil Chougule 
Date: Friday, June 1, 2018 at 6:21 AM
To: user 
Subject: Spark structured streaming generate output path runtime

Hi

I want to generate output directory runtime for data. Directory name is derived 
from current timestamp.
Lets say, data for same minute should go into same directory.

I tried following snippet but it didn't work. All data is being written in same 
directory (created with respect to initial timestamp)

val query = df2.writeStream
  .format("text")
  .option("path", "/home/data/"+getFlourTimestamp())
  .option("checkpointLocation","./checkpoint")
  .start()

  query.awaitTermination()



  def getFlourTimestamp(): Long ={
var curTime = System.currentTimeMillis()
curTime - (curTime % 5000)
  }


In other words, I want getFlourTimestamp() to be executed after every batch.

Any help around this will be really appreciated.

Thanks,
Swapnil



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Spark structured streaming generate output path runtime

2018-06-01 Thread Swapnil Chougule
Hi

I want to generate output directory runtime for data. Directory name is
derived from current timestamp.
Lets say, data for same minute should go into same directory.

I tried following snippet but it didn't work. All data is being written in
same directory (created with respect to initial timestamp)

val query = df2.writeStream
  .format("text")
  .option("path", "/home/data/"+getFlourTimestamp())
  .option("checkpointLocation","./checkpoint")
  .start()

  query.awaitTermination()



  def getFlourTimestamp(): Long ={
var curTime = System.currentTimeMillis()
curTime - (curTime % 5000)
  }


In other words, I want getFlourTimestamp() to be executed after every batch.

Any help around this will be really appreciated.

Thanks,
Swapnil


[Spark SQL] Is it possible to do stream to stream inner join without event time?

2018-06-01 Thread Becket Qin
Hi,

I am new to Spark and I'm trying to run a few queries from TPC-H using
Spark SQL.

According to the documentation here
,
it is OPTIONAL to have watermark defined in the case of inner join between
two streams. However, I am keeping getting the following exception:

org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark

So it looks that the watermark is mandatory. Because there is no timestamp
in the TPC-H records, I am not able to specify watermark with event time.
Is there a recommended workaround? e.g. using the process time instead fo
event time?

Thanks,

Jiangjie (Becket) Qin


[Spark SQL] Efficiently calculating Weight of Evidence in PySpark

2018-06-01 Thread Aakash Basu
Hi guys,

Can anyone please let me know if you've any clue on this problem I posted
in StackOverflow -

https://stackoverflow.com/questions/50638911/how-to-efficiently-calculate-woe-in-pyspark

Thanks,
Aakash.