Re: Kafka Schema registry

2020-01-14 Thread aj
 ConfluentRegistryAvroDeserializationSchema.forGeneric() is require reader
schema .How we can used it deseralize using writer schema.

On Fri, Sep 13, 2019 at 12:04 AM Lasse Nedergaard 
wrote:

> Hi Elias
>
> Thanks for letting me know. I have found it but we also need the option to
> register Avro Schema’s and use the registry when we write to Kafka. So we
> will create a serialisation version and when it works implement it into
> Flink and create a pull request for the community.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 12. sep. 2019 kl. 17.45 skrev Elias Levy  >:
>
> Just for a Kafka source:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
>
>- There is also a version of this schema available that can lookup the
>writer’s schema (schema which was used to write the record) in Confluent
>Schema Registry
>.
>Using these deserialization schema record will be read with the schema that
>was retrieved from Schema Registry and transformed to a statically
>provided( either through
>ConfluentRegistryAvroDeserializationSchema.forGeneric(...) or
>ConfluentRegistryAvroDeserializationSchema.forSpecific(...)).
>
>
> On Wed, Sep 11, 2019 at 1:48 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi.
>> Do Flink have out of the Box Support for Kafka Schema registry for both
>> sources and sinks?
>> If not, does anyone knows about a implementation we can build on so we
>> can help make it general available in a future release.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






Re: Slots Leak Observed when

2020-01-14 Thread Xintong Song
Hi,
It would be helpful for understanding the problem if you could share the
logs.

Thank you~

Xintong Song



On Wed, Jan 15, 2020 at 12:23 AM burgesschen  wrote:

> Hi guys,
>
> Out team is observing a stability issue on our Standalone Flink clusters.
>
> Background: The kafka cluster our flink jobs read from/ write to have some
> issues and every 10 to15 mins one of the partition leaders switch. This
> causes jobs that write to/ read from that topic fail and restart. Usually
> this is not a problem since the jobs can restart and work with the new
> partition leader. However, one of those restarts can make the jobs enter a
> failing state and never be able to recover.
>
> In the failing state. The jobmanager has exception:
>
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 24, slots allocated: 12
>
> During that time, 2 of the taskmanager are reporting that all the slots on
> them are occupied, however, from the dashboard of the jobmanager, no job is
> deployed to those 2 taskmanagers.
>
> My guesstimation is that since the jobs restart fairly frequently, one of
> the times the slots are not released properly when jobs failed, resulting
> in
> the jobmanager falsely believing that those 2 taskmanagers' slots are still
> occupied.
>
> It does sound like an issue mentioned in
> https://issues.apache.org/jira/browse/FLINK-9932
> but we are using 1.6.2 and according to the jira ticket, this bug is fixed
> in 1.6.2
>
> Please let me know if you have any ideas or how we can prevent it. Thank
> you
> so much!
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Question about Scala Case Class and List in Flink

2020-01-14 Thread Utopia
Hi folks,

I have two questions about types in Flink when using Scala:

1. scala case class:

This my case class define:
case class SensorReading(var id: String , var timestamp: Long, var temperature: 
Double)

In documentation, Scala case class is supported:
`Scala case classes (including Scala tuples): null fields not supported`

But the log info shows:
10:26:08,489 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class 
io.github.streamingwithflink.util.SensorReading is missing a default 
constructor so it cannot be used as a POJO type and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance.


2. scala list

This my case class define:
case class SensorReading(var id: String , var timestamp: Long, var temperature: 
Double, var list: List[String] = List[String]())
log shows:
No fields were detected for class scala.collection.immutable.List so it cannot 
be used as a POJO type and must be processed as GenericType. Please read the 
Flink documentation on "Data Types & Serialization" for details of the effect 
on performance.

Does it means that scala list can be serialize that I can ignore this info if I 
don’t care the performance.
Should I use Java array list instead of scala list or create a custom 
serializer for SensorReading case class?

Thanks!


Best  regards
Utopia


Re: PubSub source throwing grpc errors

2020-01-14 Thread Itamar Syn-Hershko
Thanks!

I was able to track this down. Essentially it was a deserialization error
which propagated and might have prevented the channel from closing down
properly. This could be considered as a problem, but I'm not further down
the rabbit hole chasing down a solution for the original deserialization
issue.

Thanks for the help!

On Tue, Jan 14, 2020 at 8:26 PM Till Rohrmann  wrote:

> Hi Itamar,
>
> for further debugging it would be helpful to get the full logs of Flink
> and more information about your environment. Since I'm not too
> familiar with Flink's PubSub connector, I have pulled in Richard (original
> author), Becket and Robert (both helped with reviewing and merging this
> connector). They might know what's going on.
>
> The problem looks a bit similar to [1]. Maybe it would help to upgrade to
> a newer google-cloud-pubsub version than 1.62.0. I assume that the others
> might know more about it.
>
> [1] https://github.com/googleapis/google-cloud-java/issues/3648
>
> Cheers,
> Till
>
> On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko <
> ita...@bigdataboutique.com> wrote:
>
>> Hi all,
>>
>> We are trying to use the PubSub source with a very minimal and basic
>> Flink application as a POC, and getting the following error consistently
>> every couple of seconds. What am I missing?
>>
>> ```
>> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
>> cleanQueue
>> SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=
>> pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
>> Make sure to call shutdown()/shutdownNow() and wait until
>> awaitTermination() returns true.
>> java.lang.RuntimeException: ManagedChannel allocation site
>> at
>> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:103)
>> at
>> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
>> at
>> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
>> at
>> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419)
>> at
>> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55)
>> at
>> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178)
>> at
>> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>> ```
>>
>> Thanks!
>>
>> --
>>
>> [image: logo] 
>> Itamar Syn-Hershko
>>
>>
>> ita...@bigdataboutique.com
>> https://bigdataboutique.com
>> 
>> 
>> 
>>
>

-- 

[image: logo] 
Itamar Syn-Hershko
CTO, Founder
+972-54-2467860
ita...@bigdataboutique.com
https://bigdataboutique.com





Re: Understanding watermark

2020-01-14 Thread Till Rohrmann
Hi Cam,

could you share a bit more details about your job (e.g. which sources are
you using, what are your settings, etc.). Ideally you can provide a minimal
example in order to better understand the program.

>From a high level perspective, there might be different problems: First of
all, Flink does not support checkpointing/taking a savepoint if some of the
job's operator have already terminated iirc. But your description points
rather into the direction that your bounded source does not terminate. So
maybe you are reading a file via StreamExecutionEnvironment.createFileInput
with FileProcessingMode.PROCESS_CONTINUOUSLY. But these things are hard to
tell without a better understanding of your job.

Cheers,
Till

On Mon, Jan 13, 2020 at 8:35 PM Cam Mach  wrote:

> Hello Flink expert,
>
> We have a pipeline that read both bounded and unbounded sources and our
> understanding is that when the bounded sources complete they should get a
> watermark of +inf and then we should be able to take a savepoint and safely
> restart the pipeline. However, we have source that never get watermarks and
> we are confused as to what we are seeing and what we should expect
>
>
> Cam Mach
> Software Engineer
> E-mail: cammac...@gmail.com
> Tel: 206 972 2768
>
>


Re: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

2020-01-14 Thread Till Rohrmann
Great, thanks a lot for looking into the problem and fixing it. I assume
that your PR will be merged very soon.

Cheers,
Till

On Tue, Jan 14, 2020 at 7:18 PM Benoit Hanotte  wrote:

> Hello Till,
> thanks for your reply!
> I have been able to debug the issue and reported it in
> https://issues.apache.org/jira/browse/FLINK-15577.
> It seems the old planner does not add the window specs to the Logical
> nodes' digests, leading the HepPlanner to consider the aggregations to be
> equivalent, when they are not because they use different time windows. I
> explained the issue more in details in the ticket above, and have submitted
> a PR earlier today: https://github.com/apache/flink/pull/10854.
> 
> [FLINK-15577][table-planner] Fix similar aggregations with different
> windows being considered the same by BenoitHanotte · Pull Request #10854 ·
> apache/flink 
> What is the purpose of the change The RelNode's digest is used by the
> Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an
> equivalent vertex was already present in the grap...
> github.com
> Best,
> Benoit
> --
> *From:* Till Rohrmann 
> *Sent:* Tuesday, January 14, 2020 7:13 PM
> *To:* Benoit Hanotte 
> *Cc:* user@flink.apache.org ; Jingsong Li <
> jingsongl...@gmail.com>; twal...@apache.org 
> *Subject:* [BULK]Re: Incorrect Physical Plan when unioning two different
> windows, giving incorrect SQL query results
>
> Hi Benoit,
>
> thanks for reporting this issue. Since I'm not too familiar with the SQL
> component I've pulled in Timo and Jingsong who know much better what could
> be wrong than I do.
>
> Cheers,
> Till
>
> On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte 
> wrote:
>
> Hello,
>
> We seem to be facing an issue with Flink where the physical plan after
> planner optimization is not correct.
> I have been able to reproduce the issue in the following "simplified" use
> case (it doesn't seem to happen in trivial cases):
>
>1. We open 2 event streams ("clicks" and "displays")
>2. We compute the click rate (ctr) over 2 hours and 6 hours sliding
>windows.
>3. We then union to output one row per hour with the max value between
>the values computed over 2 and 6hrs.
>
> You can find SQL query below [1].
> After activating the debug logging for calcite, I can see that the
> original logical plan is valid: the top-level UNION is between two
> LogicalProjects, for the 2hr and 6hrs HOP windows [2].
> However, in the final Physical plan, we can see that both sides of the
> UNION now have 6hrs HOP windows instead of one window over 2hr and one over
> 6hr [3].
>
> I pushed a commit to my fork to reproduce the issue:
> https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f
> ,
> unfortunately simplifying the query seems to make the issue disappear.
>
> Is there anything obvious I am missing, or do you have any pointer of what
> could trigger this issue? I looked at the different rules applied by the
> planner [4], but, as I am not familiar with them, I haven't yet been able
> to find the root cause.
>
> Thanks a lot for your help!
>
> Benoit Hanotte
>
> * [1] SQL query
> *
>
> WITH displays AS (
> SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM
> my_catalog.my_db.display
> ),
>
> clicks AS (
> SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM
> my_catalog.my_db.click
> ),
>
> counts_2h AS (
> SELECT
> SUM(nb_clicks) / SUM(nb_displays) as ctr,
> HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> as `timestamp`
> FROM (
> (SELECT * FROM displays)
> UNION ALL
> (SELECT * FROM clicks)
> ) t
> GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR,* INTERVAL '2' HOUR)*
> ),
>
> counts_6h AS (
> SELECT
> SUM(nb_clicks) / SUM(nb_displays) as ctr,
> HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
> as `timestamp`
> FROM (
> (SELECT * FROM displays)
> UNION ALL
> (SELECT * FROM clicks)
> ) t
> GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, *INTERVAL '6' HOUR*)
> )
>
> SELECT
> TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
> MAX(ctr)
> FROM (
> (SELECT * FROM counts_6h)
> UNION ALL
> (SELECT * FROM counts_2h)
> ) t
> GROUP BY TUMBLE(`timestam

Re: Custom File Sink using EventTime and defined custom file name for parquet file

2020-01-14 Thread Till Rohrmann
Hi David,

I'm pulling in Kostas who worked on the StreamingFileSink and might be able
to answer some of your questions.

Cheers,
Till

On Mon, Jan 13, 2020 at 2:45 PM Leonard Xu  wrote:

> Hi, David
>
> For you first description, I’m a little confused about duplicated records
> when backfilling, could you describe your usage scenario/code more?
>
> I remembered a backfill user solution from Pinterest which is very similar
> to yours and using Flink too[1], hope that can help you.
>
> Best,
> Leonard
>
> [1]
> https://www.youtube.com/watch?v=3-X6FJ5JS4E&list=PLDX4T_cnKjD207Aa8b5CsZjc7Z_KRezGz&index=64
>
>
> 在 2020年1月10日,12:14,David Magalhães  写道:
>
> Hi, I'm working for the first time with Flink and I'm trying to create
> solution that will store events from Kafka into Parquet files in S3. This
> also should support re-injection of events from Parquet files into a Kafka
> topic.
>
> Here
>  is
> the code with a simple usage of StreamingFileSink with BulkEncode that will
> get the events and store in parquet files. The files will be partition by
> account_id and year and month (MM). The issue with this approach is
> when running the backfill from a certain point in time, it will be hard to
> not generate duplicated events, since we will not override the same files,
> as the filename is generate by "*part--*".
>
> To add predictability, I've used a tumbling window to aggregate multiple
> GenericRecord, in order to write the parquet file with a list of them. For
> that I've created a custom file sink, but I'm not sure of the properties I
> am going to lose compared to the Streaming File Sink. Here
>  is
> the code. Still, there is something missing in this solution to close a
> window for with a giving timeout, so it can write into the sink the last
> events if no more events are sent.
>
> Another work around, would be create a StreamingFileSink with a
> RowEncoder, and receive a List of GenericRecord, and create a custom
> Encoder with *AvroParquetWritter* to write to a File. This way I have
> access to a custom rolling policy. But this looks like truly inefficient.
> Here
>  is
> the code.
>
> Am I overthinking this solution ? I'm know there are some issues (recently
> closed) for the StreamingFileSink to support more custom rolling policies
> in BulkEncode, like https://issues.apache.org/jira/browse/FLINK-13027,
> but I just notice that now.
> 
>
>
>


Re: PubSub source throwing grpc errors

2020-01-14 Thread Till Rohrmann
Hi Itamar,

for further debugging it would be helpful to get the full logs of Flink and
more information about your environment. Since I'm not too familiar with
Flink's PubSub connector, I have pulled in Richard (original author),
Becket and Robert (both helped with reviewing and merging this connector).
They might know what's going on.

The problem looks a bit similar to [1]. Maybe it would help to upgrade to a
newer google-cloud-pubsub version than 1.62.0. I assume that the others
might know more about it.

[1] https://github.com/googleapis/google-cloud-java/issues/3648

Cheers,
Till

On Mon, Jan 13, 2020 at 12:19 PM Itamar Syn-Hershko <
ita...@bigdataboutique.com> wrote:

> Hi all,
>
> We are trying to use the PubSub source with a very minimal and basic Flink
> application as a POC, and getting the following error consistently every
> couple of seconds. What am I missing?
>
> ```
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference
> cleanQueue
> SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=5, target=
> pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
> Make sure to call shutdown()/shutdownNow() and wait until
> awaitTermination() returns true.
> java.lang.RuntimeException: ManagedChannel allocation site
> at
> io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.(ManagedChannelOrphanWrapper.java:103)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:53)
> at
> io.grpc.internal.ManagedChannelOrphanWrapper.(ManagedChannelOrphanWrapper.java:44)
> at
> io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:419)
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.DefaultPubSubSubscriberFactory.getSubscriber(DefaultPubSubSubscriberFactory.java:55)
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.createAndSetPubSubSubscriber(PubSubSource.java:178)
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.open(PubSubSource.java:100)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.base/java.lang.Thread.run(Thread.java:834)
> ```
>
> Thanks!
>
> --
>
> [image: logo] 
> Itamar Syn-Hershko
>
>
> ita...@bigdataboutique.com
> https://bigdataboutique.com
> 
> 
> 
>


Re: [BULK]Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

2020-01-14 Thread Benoit Hanotte
Hello Till,
thanks for your reply!
I have been able to debug the issue and reported it in 
https://issues.apache.org/jira/browse/FLINK-15577.
It seems the old planner does not add the window specs to the Logical nodes' 
digests, leading the HepPlanner to consider the aggregations to be equivalent, 
when they are not because they use different time windows. I explained the 
issue more in details in the ticket above, and have submitted a PR earlier 
today: https://github.com/apache/flink/pull/10854.
[https://avatars3.githubusercontent.com/u/47359?s=400&v=4]
[FLINK-15577][table-planner] Fix similar aggregations with different windows 
being considered the same by BenoitHanotte * Pull Request #10854 * 
apache/flink
What is the purpose of the change The RelNode's digest is used by the 
Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an 
equivalent vertex was already present in the grap...
github.com
Best,
Benoit

From: Till Rohrmann 
Sent: Tuesday, January 14, 2020 7:13 PM
To: Benoit Hanotte 
Cc: user@flink.apache.org ; Jingsong Li 
; twal...@apache.org 
Subject: [BULK]Re: Incorrect Physical Plan when unioning two different windows, 
giving incorrect SQL query results

Hi Benoit,

thanks for reporting this issue. Since I'm not too familiar with the SQL 
component I've pulled in Timo and Jingsong who know much better what could be 
wrong than I do.

Cheers,
Till

On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte 
mailto:b.hano...@criteo.com>> wrote:
Hello,

We seem to be facing an issue with Flink where the physical plan after planner 
optimization is not correct.
I have been able to reproduce the issue in the following "simplified" use case 
(it doesn't seem to happen in trivial cases):

  1.  We open 2 event streams ("clicks" and "displays")
  2.  We compute the click rate (ctr) over 2 hours and 6 hours sliding windows.
  3.  We then union to output one row per hour with the max value between the 
values computed over 2 and 6hrs.

You can find SQL query below [1].
After activating the debug logging for calcite, I can see that the original 
logical plan is valid: the top-level UNION is between two LogicalProjects, for 
the 2hr and 6hrs HOP windows [2].
However, in the final Physical plan, we can see that both sides of the UNION 
now have 6hrs HOP windows instead of one window over 2hr and one over 6hr [3].

I pushed a commit to my fork to reproduce the issue: 
https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f,
 unfortunately simplifying the query seems to make the issue disappear.

Is there anything obvious I am missing, or do you have any pointer of what 
could trigger this issue? I looked at the different rules applied by the 
planner [4], but, as I am not familiar with them, I haven't yet been able to 
find the root cause.

Thanks a lot for your help!

Benoit Hanotte

* [1] SQL query 
*

WITH displays AS (
SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM 
my_catalog.my_db.display
),

clicks AS (
SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM 
my_catalog.my_db.click
),

counts_2h AS (
SELECT
SUM(nb_clicks) / SUM(nb_displays) as ctr,
HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
`timestamp`
FROM (
(SELECT * FROM displays)
UNION ALL
(SELECT * FROM clicks)
) t
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
),

counts_6h AS (
SELECT
SUM(nb_clicks) / SUM(nb_displays) as ctr,
HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR) as 
`timestamp`
FROM (
(SELECT * FROM displays)
UNION ALL
(SELECT * FROM clicks)
) t
GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
)

SELECT
TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
MAX(ctr)
FROM (
(SELECT * FROM counts_6h)
UNION ALL
(SELECT * FROM counts_2h)
) t
GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)


* [2] Logical plan (before optimization) 
***

LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
  LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
LogicalProject($f0=[TUMBLE($1, 360:INTERVAL HOUR)], ctr=[$0])
  LogicalUnion(all=[true])
 

Re: Incorrect Physical Plan when unioning two different windows, giving incorrect SQL query results

2020-01-14 Thread Till Rohrmann
Hi Benoit,

thanks for reporting this issue. Since I'm not too familiar with the SQL
component I've pulled in Timo and Jingsong who know much better what could
be wrong than I do.

Cheers,
Till

On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte 
wrote:

> Hello,
>
> We seem to be facing an issue with Flink where the physical plan after
> planner optimization is not correct.
> I have been able to reproduce the issue in the following "simplified" use
> case (it doesn't seem to happen in trivial cases):
>
>1. We open 2 event streams ("clicks" and "displays")
>2. We compute the click rate (ctr) over 2 hours and 6 hours sliding
>windows.
>3. We then union to output one row per hour with the max value between
>the values computed over 2 and 6hrs.
>
> You can find SQL query below [1].
> After activating the debug logging for calcite, I can see that the
> original logical plan is valid: the top-level UNION is between two
> LogicalProjects, for the 2hr and 6hrs HOP windows [2].
> However, in the final Physical plan, we can see that both sides of the
> UNION now have 6hrs HOP windows instead of one window over 2hr and one over
> 6hr [3].
>
> I pushed a commit to my fork to reproduce the issue:
> https://github.com/BenoitHanotte/flink/commit/3d388f153b44bb35b57b8400407ff24a2e91661f,
> unfortunately simplifying the query seems to make the issue disappear.
>
> Is there anything obvious I am missing, or do you have any pointer of what
> could trigger this issue? I looked at the different rules applied by the
> planner [4], but, as I am not familiar with them, I haven't yet been able
> to find the root cause.
>
> Thanks a lot for your help!
>
> Benoit Hanotte
>
> * [1] SQL query
> *
>
> WITH displays AS (
> SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM
> my_catalog.my_db.display
> ),
>
> clicks AS (
> SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM
> my_catalog.my_db.click
> ),
>
> counts_2h AS (
> SELECT
> SUM(nb_clicks) / SUM(nb_displays) as ctr,
> HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> as `timestamp`
> FROM (
> (SELECT * FROM displays)
> UNION ALL
> (SELECT * FROM clicks)
> ) t
> GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR,* INTERVAL '2' HOUR)*
> ),
>
> counts_6h AS (
> SELECT
> SUM(nb_clicks) / SUM(nb_displays) as ctr,
> HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
> as `timestamp`
> FROM (
> (SELECT * FROM displays)
> UNION ALL
> (SELECT * FROM clicks)
> ) t
> GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, *INTERVAL '6' HOUR*)
> )
>
> SELECT
> TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
> MAX(ctr)
> FROM (
> (SELECT * FROM counts_6h)
> UNION ALL
> (SELECT * FROM counts_2h)
> ) t
> GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)
>
>
> * [2] Logical plan (before optimization)
> ***
>
> LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
>   LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
> LogicalProject($f0=[TUMBLE($1, 360:INTERVAL HOUR)], ctr=[$0])
>   LogicalUnion(all=[true])
> LogicalProject(ctr=[$0], timestamp=[$1])
>   LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
> LogicalAggregate(group=[{0}], agg#0=[SUM($1)],
> agg#1=[SUM($2)])
>   LogicalProject($f0=[*HOP($0, 360:INTERVAL HOUR,
> 2160:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1])
> LogicalProject(timestamp=[$0], nb_displays=[0],
> nb_clicks=[1])
>   LogicalTableScan(table=[[my_catalog, my_db, click]])
> LogicalProject(ctr=[$0], timestamp=[$1])
>   LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
> LogicalAggregate(group=[{0}], agg#0=[SUM($1)],
> agg#1=[SUM($2)])
>   LogicalProject($f0=[*HOP($0, 360:INTERVAL HOUR,
> 720:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1])
> LogicalProject(timestamp=[$0], nb_displays=[1],
> nb_clicks=[0])
>   LogicalTableScan(table=[[my_catalog, my_db,
> display]])
>
>
> ** [3] Resulting physical plan (after optimization)
> 
>
> DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]):
> rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io},
> id = 556
>   DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$,
> 'timestamp, 360.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS
> w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
> w$proctime]): rowcount = 400.0, cumulative

Slots Leak Observed when

2020-01-14 Thread burgesschen
Hi guys,

Out team is observing a stability issue on our Standalone Flink clusters.

Background: The kafka cluster our flink jobs read from/ write to have some
issues and every 10 to15 mins one of the partition leaders switch. This
causes jobs that write to/ read from that topic fail and restart. Usually
this is not a problem since the jobs can restart and work with the new
partition leader. However, one of those restarts can make the jobs enter a
failing state and never be able to recover.

In the failing state. The jobmanager has exception: 

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 24, slots allocated: 12

During that time, 2 of the taskmanager are reporting that all the slots on
them are occupied, however, from the dashboard of the jobmanager, no job is
deployed to those 2 taskmanagers.

My guesstimation is that since the jobs restart fairly frequently, one of
the times the slots are not released properly when jobs failed, resulting in
the jobmanager falsely believing that those 2 taskmanagers' slots are still
occupied.

It does sound like an issue mentioned in 
https://issues.apache.org/jira/browse/FLINK-9932
but we are using 1.6.2 and according to the jira ticket, this bug is fixed
in 1.6.2

Please let me know if you have any ideas or how we can prevent it. Thank you
so much!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Is there a way to clean up the state based on custom/business logic?

2020-01-14 Thread Dawid Wysakowicz
Hi,

What you are asking is certainly possible with the DataStream API. Table
API/SQL is a declarative API, where say what to want to compute not how.

As a rule of thumb I would say whenever you need to manually handle your
state or timers the DataStream API and ProcessFunction[1] will be a
better fit for you.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html

On 14/01/2020 11:33, kant kodali wrote:
> Hi All,
>
> I read through the doc below and I am wondering if I can clean up the
> state based on custom logic rather min and max retention time?
>
> For example, I want to say clean up all the state where the key = foo
> or say the value = bar. so until the keys reach a particular value
> just keep accumulating(retain) and when it does reach a particular
> value then write to some sink and clean up! I am looking for something
> along those lines. please let me know.
>
> Thanks 
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html



signature.asc
Description: OpenPGP digital signature


Is there a way to clean up the state based on custom/business logic?

2020-01-14 Thread kant kodali
Hi All,

I read through the doc below and I am wondering if I can clean up the state
based on custom logic rather min and max retention time?

For example, I want to say clean up all the state where the key = foo or
say the value = bar. so until the keys reach a particular value just keep
accumulating(retain) and when it does reach a particular value then write
to some sink and clean up! I am looking for something along those lines.
please let me know.

Thanks


https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/query_configuration.html