[jira] [Created] (FLINK-27466) JDBC metaspace leak fix is misleading

2022-05-01 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-27466:


 Summary: JDBC metaspace leak fix is misleading
 Key: FLINK-27466
 URL: https://issues.apache.org/jira/browse/FLINK-27466
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC, Documentation
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.16.0


{code}
To ensure that these classes are only loaded once you should either add the 
driver jars to Flink’s lib/ folder, or add the driver classes to the list of 
parent-first loaded class via classloader.parent-first-patterns-additional.
{code}

This reads as if adding the driver to 
classloader.parent-first-patterns-additional can solve the issue in all cases, 
but this only works if the driver is already in lib/.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: [DISCUSS] FLIP-224: Blacklist Mechanism

2022-05-01 Thread Martijn Visser
Hi everyone,

Thanks for creating this FLIP. I can understand the problem and I see value
in the automatic detection and blocklisting. I do have some concerns with
the ability to manually specify to be blocked resources. I have two
concerns;

* Most organizations explicitly have a separation of concerns, meaning that
there's a group who's responsible for managing a cluster and there's a user
group who uses that cluster. With the introduction of this mechanism, the
latter group now can influence the responsibility of the first group. So it
can be possible that someone from the user group blocks something, which
causes an outage (which could result in paging mechanism triggering etc)
which impacts the first group.
* How big is the group of people who can go through the process of manually
identifying a node that isn't behaving as it should be? I do think this
group is relatively limited. Does it then make sense to introduce such a
feature, which would only be used by a really small user group of Flink? We
still have to maintain, test and support such a feature.

I'm +1 for the autodetection features, but I'm leaning towards not exposing
this to the user group but having this available strictly for cluster
operators. They could then also set up their paging/metrics/logging system
to take this into account.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Fri, 29 Apr 2022 at 09:39, Yangze Guo  wrote:

> Thanks for driving this, Zhu and Lijie.
>
> +1 for the overall proposal. Just share some cents here:
>
> - Why do we need to expose
> cluster.resource-blacklist.item.timeout-check-interval to the user?
> I think the semantics of `cluster.resource-blacklist.item.timeout` is
> sufficient for the user. How to guarantee the timeout mechanism is
> Flink's internal implementation. I think it will be very confusing and
> we do not need to expose it to users.
>
> - ResourceManager can notify the exception of a task manager to
> `BlacklistHandler` as well.
> For example, the slot allocation might fail in case the target task
> manager is busy or has a network jitter. I don't mean we need to cover
> this case in this version, but we can also open a `notifyException` in
> `ResourceManagerBlacklistHandler`.
>
> - Before we sync the blocklist to ResourceManager, will the slot of a
> blocked task manager continues to be released and allocated?
>
> Best,
> Yangze Guo
>
> On Thu, Apr 28, 2022 at 3:11 PM Lijie Wang 
> wrote:
> >
> > Hi Konstantin,
> >
> > Thanks for your feedback. I will response your 4 remarks:
> >
> >
> > 1) Thanks for reminding me of the controversy. I think “BlockList” is
> good
> > enough, and I will change it in FLIP.
> >
> >
> > 2) Your suggestion for the REST API is a good idea. Based on the above, I
> > would change REST API as following:
> >
> > POST/GET /blocklist/nodes
> >
> > POST/GET /blocklist/taskmanagers
> >
> > DELETE /blocklist/node/
> >
> > DELETE /blocklist/taskmanager/
> >
> >
> > 3) If a node is blocking/blocklisted, it means that all task managers on
> > this node are blocklisted. All slots on these TMs are not available. This
> > is actually a bit like TM losts, but these TMs are not really lost, they
> > are in an unavailable status, and they are still registered in this flink
> > cluster. They will be available again once the corresponding blocklist
> item
> > is removed. This behavior is the same in active/non-active clusters.
> > However in the active clusters, these TMs may be released due to idle
> > timeouts.
> >
> >
> > 4) For the item timeout, I prefer to keep it. The reasons are as
> following:
> >
> > a) The timeout will not affect users adding or removing items via REST
> API,
> > and users can disable it by configuring it to Long.MAX_VALUE .
> >
> > b) Some node problems can recover after a period of time (such as machine
> > hotspots), in which case users may prefer that Flink can do this
> > automatically instead of requiring the user to do it manually.
> >
> >
> > Best,
> >
> > Lijie
> >
> > Konstantin Knauf  于2022年4月27日周三 19:23写道:
> >
> > > Hi Lijie,
> > >
> > > I think, this makes sense and +1 to only support manually blocking
> > > taskmanagers and nodes. Maybe the different strategies can also be
> > > maintained outside of Apache Flink.
> > >
> > > A few remarks:
> > >
> > > 1) Can we use another term than "bla.cklist" due to the controversy
> around
> > > the term? [1] There was also a Jira Ticket about this topic a while
> back
> > > and there was generally a consensus to avoid the term blacklist &
> whitelist
> > > [2]? We could use "blocklist" "denylist" or "quarantined"
> > > 2) For the REST API, I'd prefer a slightly different design as verbs
> like
> > > add/remove often considered an anti-pattern for REST APIs. POST on a
> list
> > > item is generally the standard to add items. DELETE on the individual
> > > resource is standard to remove an item.
> > >
> > > POST /quarantine/items
> > > DELETE /quarant

Re: Failed Unit Test on Master Branch

2022-05-01 Thread Thomas Weise
I reproduced the issue.

The inconsistent test result is due to time zone dependency in the test.

The underlying issue is that convertToTimestamp attempts to set negative
value with java.sql.Timestamp.setNanos

https://issues.apache.org/jira/browse/FLINK-27465

Thanks,
Thomas


On Thu, Apr 28, 2022 at 7:23 PM Guowei Ma  wrote:

> Hi Haizhou
>
> I ran the test and there is no problem.
> And commit is "d940af688be90c92ce4f8b9ca883f6753c94aa0f"
>
> Best,
> Guowei
>
>
> On Fri, Apr 29, 2022 at 5:39 AM Haizhou Zhao 
> wrote:
>
> > Hello Flink Community,
> >
> > I was encountering some unit test failure in the flink-avro sub-module
> when
> > I tried to pull down the master branch and build.
> >
> > Here is the command I ran:
> >
> > mvn clean package -pl flink-formats/flink-avro
> >
> > Here is the test that fails:
> >
> >
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java#L178
> >
> > Here is the exception that was thrown:
> > [ERROR]
> >
> >
> org.apache.flink.formats.avro.AvroRowDeSerializationSchemaTest.testGenericDeserializeSeveralTimes
> >  Time elapsed: 0.008 s  <<< ERROR!
> > java.io.IOException: Failed to deserialize Avro record.
> > ...
> >
> > Here is the latest commit of the HEAD I pulled:
> > commit c5430e2e5d4eeb0aba14ce3ea8401747afe0182d (HEAD -> master,
> > oss/master)
> >
> > Can someone confirm this is indeed a problem on the master branch? If
> yes,
> > any suggestions on fixing it?
> >
> > Thank you,
> > Haizhou Zhao
> >
>


[jira] [Created] (FLINK-27465) AvroRowDeserializationSchema.convertToTimestamp fails with negative nano seconds

2022-05-01 Thread Thomas Weise (Jira)
Thomas Weise created FLINK-27465:


 Summary: AvroRowDeserializationSchema.convertToTimestamp fails 
with negative nano seconds
 Key: FLINK-27465
 URL: https://issues.apache.org/jira/browse/FLINK-27465
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.15.0
Reporter: Thomas Weise
Assignee: Thomas Weise


The issue is exposed due to time zone dependency in 
AvroRowDeSerializationSchemaTest.
 
The root cause is that convertToTimestamp attempts to set negative value with 
java.sql.Timestamp.setNanos



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


Re: Source alignment for Iceberg

2022-05-01 Thread Steven Wu
add dev@ group to the thread as Thomas suggested

Arvid,

The scenario 3 (Dynamic assignment + temporary no split) in the FLIP-180
(idleness) can happen to Iceberg source alignment, as readers can be
temporarily starved due to the holdback by the enumerator when assigning
new splits upon request.

Totally agree that we should decouple this discussion with the FLIP-217,
which addresses the split level watermark alignment problem as a follow-up
of FLIP-182

Becket,

Yes, currently Iceberg source implemented the alignment leveraging the
dynamic split assignment from FLIP-27 design. Basically, the enumerator can
hold back split assignments to readers when necessary. Everything are
centralized in the enumerator: (1) watermark extraction and aggregation (2)
alignment decision and execution

The motivation of this discussion is to see if Iceberg source can leverage
some of the watermark alignment solutions (like FLIP-182) from Flink
framework. E.g., as mentioned in the doc, Iceberg source can potentially
leverage the FLIP-182 framework to do the watermark extraction and
aggregation. For the alignment decision and execution, we can keep them in
the centralized enumerator.

Thanks,
Steven

On Thu, Apr 28, 2022 at 2:05 AM Becket Qin  wrote:

> Hi Steven,
>
> Thanks for pulling me into this thread. I think the timestamp
> alignment use case here is a good example of what FLIP-27 was designed for.
>
> Technically speaking, Iceberg source can already implement the timestamp
> alignment in the Flink new source even without FLIP-182. However, I
> understand the rationale here because timestamp alignment is also trying to
> orchestrate the consumption of splits. However, it looks like FLIP-182 was
> not designed in a way that it can be easily extended for other use cases.
> It may probably worth thinking of a more general mechanism to answer the
> following questions:
>
> 1. What information whose source of truth is the Flink framework should be
> exposed to the SplitEnumerator and SourceReader? And how?
> 2. What control actions in the Flink framework are worth exposing to the
> SplitEnumerators and SourceReaders? And how?
>
> In the context of timestamp alignment, the first question is more
> relevant. For example, instead of hardcode the ReportWatermarkEvent
> handling logic in the SourceCoordinator, should we expose this to the
> SplitEnumerator? So basically there will be some information, such as
> subtask local watermark, whose source of truth is Flink runtime, but useful
> to the user provided pluggables.
>
> I think there are a few control flow patterns to make a complete design:
>
> a. Framework space information (e.g. watermark) --> User space Pluggables
> (e.g. SplitEnumerator) --> User space Actions (e.g. Pause reading a split).
> b. Framework space information (e.g. task failure) --> User space
> pluggables (e.g. SplitEnumerator) --> Framework space actions (e.g. exit
> the job)
> c. User space information (e.g. a custom workload metric) --> User space
> pluggables (e.g. SplitEnumerator) --> User space actions (e.g. rebalance
> the workload across the source readers).
> d. Use space information (e.g. a custom stopping event in the stream) -->
> User space pluggables (e.g. SplitEnumerator) --> Framework space actions
> (e.g. stop the job).
>
> So basically for any user provided pluggables, the input information may
> either come from another user provided logic or from the framework, and
> after receiving that information, the pluggable may either want the
> framework or another pluggable to take an action. So this gives the above 4
> combinations.
>
> In our case, when the pluggables are SplitEnumerator and SourceReader, the
> control flows that only involve user space actions are fully supported. But
> it seems that when it comes to control flows involving framework space
> information, some of the information has not been exposed to the pluggable,
> and some framework actions might also be missing.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Apr 28, 2022 at 3:44 PM Arvid Heise  wrote:
>
>> Hi folks,
>>
>> quick input from my side. I think this is from the implementation
>> perspective what Piotr and I had in mind for a global min watermark that
>> helps in idleness cases. See also point 3 in
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>> .
>>
>> Basically, we would like to empower source enumerators to determine the
>> global min watermark for all source readers factoring in even future
>> splits. Not all sources can supply that information (think of a general
>> file source) but most should be able to. Basically, Flink should know for a
>> given source at a given point in time what the min watermark across all
>> source subtasks is.
>>
>> Here is some background:
>> In the context of idleness, we can deterministically advance the
>> watermark. In the pre-FLIP-27 era, we had heuristic approaches in sources
>> to switch to idleness and t

[jira] [Created] (FLINK-27464) java.lang.NoClassDefFoundError

2022-05-01 Thread zhangxin (Jira)
zhangxin created FLINK-27464:


 Summary: java.lang.NoClassDefFoundError
 Key: FLINK-27464
 URL: https://issues.apache.org/jira/browse/FLINK-27464
 Project: Flink
  Issue Type: Bug
  Components: Connectors / JDBC
Affects Versions: 1.13.6
Reporter: zhangxin


used config  classloader.resolve-order: parent-first

at:

 

Caused by: org.apache.flink.util.FlinkException: Global failure triggered by 
OperatorCoordinator for 'Source: Flink-IMS -> Map -> Sink: Unnamed' (operator 
cbc357ccb763df2852fee8c4fc7d55f2).
    at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:553)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:223)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:285)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:133)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:381)
    at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:136)
    at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719)
    at 
java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
    at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at 
org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:71)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: com/zaxxer/hikari/HikariConfig
    at 
com.ververica.cdc.connectors.mysql.source.connection.PooledDataSourceFactory.createPooledDataSource(PooledDataSourceFactory.java:38)
    at 
com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionPools.getOrCreateConnectionPool(JdbcConnectionPools.java:51)
    at 
com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:53)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:872)
    at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:867)
    at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:413)
    at 
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:62)
    at 
com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:68)
    at 
com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:156)
    at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:129)
    ... 8 more



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27463) The JDBC connector should support option for ‘upsert’ or ‘insert’

2022-05-01 Thread Liam (Jira)
Liam created FLINK-27463:


 Summary: The JDBC connector should support option for ‘upsert’ or 
‘insert’
 Key: FLINK-27463
 URL: https://issues.apache.org/jira/browse/FLINK-27463
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: 1.14.4
Reporter: Liam


If set a primary key to the table, then the connector will use the upsert query 
to sink data.

However, the upsert query is not always expected. Because some of the databases 
don't support upsert well, like Click House. (even though some databases are 
not supporting officially, I believe many users are extending the dialect by 
themself)

So I suggest adding an option like 'sink.append-only' to make the data 
populated by insert query.

 

If we can reach a consensus, I can implement it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)