Spark 2.x duplicates output when task fails at "repartition" stage. Checkpointing is enabled before repartition.

2019-02-05 Thread Serega Sheypak
Hi, I have spark job that produces duplicates when one or tasks from
repartition stage fails.
Here is simplified code.

sparkContext.setCheckpointDir("hdfs://path-to-checkpoint-dir")

*val *inputRDDs: List[RDD[String]] = *List*.*empty *// an RDD per input dir


*val *updatedRDDs = inputRDDs.map{ inputRDD => // some stuff happens here

  inputRDD

.filter(*???*)

 .map(*???*)

}


*val *unionOfUpdatedRDDs = sparkContext.union(updatedRDDs)

unionOfUpdatedRDDs.checkpoint() // id didn't help


unionOfUpdatedRDDs

  .repartition(42) // task failed here,

  .saveAsNewAPIHadoopFile("/path") // task failed here too.

// what really causes duplicates in output?


Re: DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-05 Thread Ryan Blue
Shubham,

DataSourceV2 passes Spark's internal representation to your source and
expects Spark's internal representation back from the source. That's why
you consume and produce InternalRow: "internal" indicates that Spark
doesn't need to convert the values.

Spark's internal representation for a date is the ordinal from the unix
epoch date, 1970-01-01 = 0.

rb

On Tue, Feb 5, 2019 at 4:46 AM Shubham Chaurasia 
wrote:

> Hi All,
>
> I am using custom DataSourceV2 implementation (*Spark version 2.3.2*)
>
> Here is how I am trying to pass in *date type *from spark shell.
>
> scala> val df =
>> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype",
>> col("datetype").cast("date"))
>> scala> df.write.format("com.shubham.MyDataSource").save
>
>
> Below is the minimal write() method of my DataWriter implementation.
>
> @Override
> public void write(InternalRow record) throws IOException {
>   ByteArrayOutputStream format = streamingRecordFormatter.format(record);
>   System.out.println("MyDataWriter.write: " + record.get(0, 
> DataTypes.DateType));
>
> }
>
> It prints an integer as output:
>
> MyDataWriter.write: 17039
>
>
> Is this a bug?  or I am doing something wrong?
>
> Thanks,
> Shubham
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Structured streaming from Kafka by timestamp

2019-02-05 Thread Cody Koeninger
To be more explicit, the easiest thing to do in the short term is use
your own instance of KafkaConsumer to get the offsets for the
timestamps you're interested in, using offsetsForTimes, and use those
for the start / end offsets.  See
https://kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/KafkaConsumer.html

Even if you are interested in implementing timestamp filter pushdown,
you need to get that basic usage working first, so I'd start there.

On Fri, Feb 1, 2019 at 11:08 AM Tomas Bartalos  wrote:
>
> Hello,
>
> sorry for my late answer.
> You're right, what I'm doing is a one time query, not a structured streaming. 
> Probably it will be best to describe my use case:
> I'd like to expose live data (via jdbc/odbc) residing in Kafka with the power 
> of spark's distributed sql engine. As jdbc server I use spark thrift server.
> Since timestamp pushdown is not possible :-(, this is a very cumbersome task.
> Let's say I want to inspect last 5 minutes of kafka. First I have to find out 
> offsetFrom per each partition that corresponds to now() - 5 minutes.
> Then I can register a kafka table:
>
> CREATE TABLE ticket_kafka_x USING kafka OPTIONS (kafka.bootstrap.servers 
> 'server1,server2,...',
>
> subscribe 'my_topic',
>
> startingOffsets '{"my_topic" : {"0" : 48532124, "1" : 49029703, "2" : 
> 49456213, "3" : 48400521}}');
>
>
> Then I can issue queries against this table (Data in Kafka is stored in Avro 
> format but I've created custom genericUDF to deserialize the data).
>
> select event.id as id, explode(event.picks) as picks from (
>
> select from_avro(value) as event from ticket_kafka_x where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
>
> ) limit 100;
>
>
> Whats even more irritating after few minutes I have to re-create this table 
> to reflect the last 5 minutes interval, otherwise the query performance would 
> suffer from increasing data to filter.
>
> Colleague of mine was able to make direct queries with timestamp pushdown in 
> latest Hive.
> How difficult is it to implement this feature in spark, could you lead me to 
> code where I could have a look ?
>
> Thank you,
>
>
> pi 25. 1. 2019 o 0:32 Shixiong(Ryan) Zhu  napĂ­sal(a):
>>
>> Hey Tomas,
>>
>> From your description, you just ran a batch query rather than a Structured 
>> Streaming query. The Kafka data source doesn't support filter push down 
>> right now. But that's definitely doable. One workaround here is setting 
>> proper  "startingOffsets" and "endingOffsets" options when loading from 
>> Kafka.
>>
>> Best Regards,
>>
>> Ryan
>>
>>
>> On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi  
>> wrote:
>>>
>>> Hi Tomas,
>>>
>>> As a general note don't fully understand your use-case. You've mentioned 
>>> structured streaming but your query is more like a one-time SQL statement.
>>> Kafka doesn't support predicates how it's integrated with spark. What can 
>>> be done from spark perspective is to look for an offset for a specific 
>>> lowest timestamp and start the reading from there.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos  
>>> wrote:

 Hello,

 I'm trying to read Kafka via spark structured streaming. I'm trying to 
 read data within specific time range:

 select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' 
 as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);


 The problem is that timestamp query is not pushed-down to Kafka, so Spark 
 tries to read the whole topic from beginning.


 explain query:

 

  +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > 
 15351480)) && (timestamp#57 < 15352344))


 Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], 
 start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) 
 [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58]
  PushedFilters: [], ReadSchema: 
 struct>>>

 Obviously the query takes forever to complete. Is there a solution to this 
 ?

 I'm using kafka and kafka-client version 1.1.1


 BR,

 Tomas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back pressure not working on streaming

2019-02-05 Thread Cody Koeninger
That article is pretty old, If you click through the link to the jira
mentioned in it, https://issues.apache.org/jira/browse/SPARK-18580 ,
it's been resolved.

On Wed, Jan 2, 2019 at 12:42 AM JF Chen  wrote:
>
> yes, 10 is a very low value for testing initial rate.
> And from this article 
> https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang/,
>  it seems spark back pressure is not available for dstream?
> So ,max rate per partition is the only available back pressure solution for 
> kafka dstream input?
>
> Regard,
> Junfeng Chen
>
>
> On Wed, Jan 2, 2019 at 11:49 AM HARSH TAKKAR  wrote:
>>
>> There is separate property for max rate , by default is is not set, so if 
>> you want to limit the max rate you should  provide that property  a value.
>>
>> Initial rate =10 means it will pick only 10 records per receiver in the 
>> batch interval when you start the process.
>>
>> Depending  upon the consumption rate it will increase  the consumption of 
>> records for processing in each batch.
>>
>> However i, feel 10 is way to low number for 32 partitioned kafka topic.
>>
>>
>>
>> Regards
>> Harsh
>> Happy New Year
>>
>> On Wed 2 Jan, 2019, 08:33 JF Chen >>
>>> I have set  spark.streaming.backpressure.enabled to true,  
>>> spark.streaming.backpressure.initialRate to 10.
>>> Once my application started, it received 32 million messages on first batch.
>>> My application runs every 300 seconds, with 32 kafka partition. So what's 
>>> is the max rate if I set initial rate to 10?
>>>
>>> Thanks!
>>>
>>>
>>> Regard,
>>> Junfeng Chen

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DataSourceV2 producing wrong date value in Custom Data Writer

2019-02-05 Thread Shubham Chaurasia
Hi All,

I am using custom DataSourceV2 implementation (*Spark version 2.3.2*)

Here is how I am trying to pass in *date type *from spark shell.

scala> val df =
> sc.parallelize(Seq("2019-02-05")).toDF("datetype").withColumn("datetype",
> col("datetype").cast("date"))
> scala> df.write.format("com.shubham.MyDataSource").save


Below is the minimal write() method of my DataWriter implementation.

@Override
public void write(InternalRow record) throws IOException {
  ByteArrayOutputStream format = streamingRecordFormatter.format(record);
  System.out.println("MyDataWriter.write: " + record.get(0,
DataTypes.DateType));

}

It prints an integer as output:

MyDataWriter.write: 17039


Is this a bug?  or I am doing something wrong?

Thanks,
Shubham