[jira] [Created] (FLINK-28148) Unable to load jar connector to a Python Table API app

2022-06-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-28148:
--

 Summary: Unable to load jar connector to a Python Table API app
 Key: FLINK-28148
 URL: https://issues.apache.org/jira/browse/FLINK-28148
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Connectors / Common, Table SQL / API
Affects Versions: 1.16.0
Reporter: Zichen Liu


Reproduction steps:
 # Clone the latest Flink from the master branch.
 # Follow the Flink [recommended 
steps](https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/)
 to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, 
Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7.
 # Create a new Python Table API app that loads in a jar, similar to:

 
{code:java}
from pyflink.table import TableEnvironment, StreamTableEnvironment, 
EnvironmentSettings
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)
t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") 
{code}
The jar loaded here can be any jar, and the following message will appear:

 
{code:java}
Traceback (most recent call last):
  File "pyflink_table_api_firehose.py", line 48, in 
log_processing()
  File "pyflink_table_api_firehose.py", line 14, in log_processing
t_env.get_config().set("pipeline.classpaths", 
"file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar")
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py",
 line 109, in set
add_jars_to_context_class_loader(value.split(";"))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py",
 line 169, in add_jars_to_context_class_loader
addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url]))
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 
1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File 
"/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 146, in deco
return f(*a, **kw)
  File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", 
line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke.
: java.lang.IllegalArgumentException: object is not an instance of declaring 
class
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
   at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
   at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
   at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
   at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Thread.java:829) {code}
Reproduced on Mac and Amazon Linux 2.

Next do:
{code:java}
pip uninstall apache-flink
pip install apache-flink{code}
To downgrade it to 1.15 release.

The loading of the jar should be successful. Even if you try to load the same 
connector built from master (reproduced with Kafka, Kinesis Firehose).



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28027) Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy

2022-06-13 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-28027:
--

 Summary: Initialise Async Sink maximum number of in flight 
messages to low number for rate limiting strategy
 Key: FLINK-28027
 URL: https://issues.apache.org/jira/browse/FLINK-28027
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common, Connectors / Kinesis
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.16.0


*Background*

In the AsyncSinkWriter, we implement a rate limiting strategy.

The initial value for the maximum number of in flight messages is set extremely 
high ({{{}maxBatchSize * maxInFlightRequests{}}}).

However, in accordance with the AIMD strategy, the TCP implementation for 
congestion control has found a small value to start with [is 
better]([https://en.wikipedia.org/wiki/TCP_congestion_control#Slow_start]).

*Suggestion*

A better default might be:
 * maxBatchSize
 * maxBatchSize / parallelism



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients

2022-06-10 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-28007:
--

 Summary: Tests for AWS Connectors Using SDK v2 to use Synchronous 
Clients
 Key: FLINK-28007
 URL: https://issues.apache.org/jira/browse/FLINK-28007
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.16.0


h3. Background

AWS SDK v2 async clients use a Netty async client for Kinesis Data 
Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a 
shared thread pool for Netty to use for network operations when one is not 
configured. The thread pool is managed by a shared ELG (event loop group), and 
this is stored in a static field. We do not configure this for the AWS 
connectors in the Flink codebase. 

When threads are spawned within the ELG, they inherit the context classloader 
from the current thread. If the ELG is created from a shared classloader, for 
instance Flink parent classloader, or MiniCluster parent classloader, multiple 
Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink 
job, it will inherit the Flink user classloader. When this job completes/fails, 
the classloader is destroyed, however the Netty thread is still referencing it, 
and this leads to below exception.

h3. Impact

This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via 
the Flink User Classloader. It is expected this is the standard deployment 
configuration.

This issue is known to impact:
- Flink mini cluster, for example in integration tests (FLINK-26064)
- Flink cluster loading AWS SDK v2 via parent classloader

h3. Suggested solution

There are a few possible solutions, as discussed 
https://github.com/apache/flink/pull/18733
1. Create a separate ELG per Flink job
2. Create a separate ELG per subtask
3. Attach the correct classloader to ELG spawned threads

h3. Error Stack

(shortened stack trace, as full is too large)
{noformat}
Feb 09 20:05:04 java.util.concurrent.ExecutionException: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
Feb 09 20:05:04 at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
(...)
Feb 09 20:05:04 Caused by: 
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute 
HTTP request: Trying to access closed classloader. Please check if you store 
classloaders directly or indirectly in static fields. If the stacktrace 
suggests that the leak occurs in a third party library and cannot be fixed 
immediately, you can disable this check with the configuration 
'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
Feb 09 20:05:04 at 
software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
(...)
Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access 
closed classloader. Please check if you store classloaders directly or 
indirectly in static fields. If the stacktrace suggests that the leak occurs in 
a third party library and cannot be fixed immediately, you can disable this 
check with the configuration 'classloader.check-leaked-classloader'.
Feb 09 20:05:04 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
Feb 09 20:05:04 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
Feb 09 20:05:04 at 
java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)
Feb 09 20:05:04 at 

[jira] [Created] (FLINK-27670) Python wrappers for Kinesis Sinks

2022-05-17 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-27670:
--

 Summary: Python wrappers for Kinesis Sinks
 Key: FLINK-27670
 URL: https://issues.apache.org/jira/browse/FLINK-27670
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Zichen Liu
 Fix For: 1.15.1


Create Python Wrappers for the new Kinesis Streams sink and the Kinesis 
Firehose sink.

An example implementation may be found here 
[https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27537) Remove requirement for Async Sink's RequestEntryT to be serializable

2022-05-06 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-27537:
--

 Summary: Remove requirement for Async Sink's RequestEntryT to be 
serializable
 Key: FLINK-27537
 URL: https://issues.apache.org/jira/browse/FLINK-27537
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Zichen Liu


Currently, in `AsyncSinkBase` and it's dependent classes, e.g. the sink writer, 
element converter etc., the `RequestEntryT` generic type is required to be 
`serializable`.

However, this requirement no longer holds and there is nothing that actually 
requires this.

Proposed approach:

* Remove the `extends serializable` from the generic type `RequestEntryT`



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27536) Rename method parameter in AsyncSinkWriter

2022-05-06 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-27536:
--

 Summary: Rename method parameter in AsyncSinkWriter
 Key: FLINK-27536
 URL: https://issues.apache.org/jira/browse/FLINK-27536
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Zichen Liu


Change the abstract method's parameter naming in AsyncSinkWriter.

From

  Consumer> requestResult

to

  Consumer> requestToRetry

or something similar.

 

This is because the consumer here is supposed to accept a list of requests that 
need to be retried.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27505) Add javadoc comments to AsyncSinkBase

2022-05-05 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-27505:
--

 Summary: Add javadoc comments to AsyncSinkBase
 Key: FLINK-27505
 URL: https://issues.apache.org/jira/browse/FLINK-27505
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Zichen Liu


Currently the javadocs describing each of the parameters on the constructor for 
AsyncSinkBase exist in AsyncSinkBaseBuilder. Since we are not enforcing the use 
of the builder, it makes more sense to have these descriptions in the 
AsyncSinkBase.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-26589) Update the logging level of Kinesis Streams and Firehose sinks

2022-03-10 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-26589:
--

 Summary: Update the logging level of Kinesis Streams and Firehose 
sinks
 Key: FLINK-26589
 URL: https://issues.apache.org/jira/browse/FLINK-26589
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Bug:

The Async Sink Base sink is not limiting throughput to the destination and 
therefore exceeding rate limits

*Cause:*

We are not throttling our requests downstream at all.

We should monitor for requests that have failed with ThroughputExceeded 
exceptions and reduce the throughput accordingly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26309) Add a polling strategy to determine whether Localstack test container has started

2022-02-22 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-26309:
--

 Summary: Add a polling strategy to determine whether Localstack 
test container has started
 Key: FLINK-26309
 URL: https://issues.apache.org/jira/browse/FLINK-26309
 Project: Flink
  Issue Type: Technical Debt
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu


The firehose sink is an at least once sink. But we only expect there to be 
duplicates during failures and reload from save/checkpoints. During 
`KinesisFirehoseSinkITCase` there is no such action, and yet, we occasionally 
get duplicates in the test result. The test was originally asserting exactly 
once erroneously and this has been fixed in #18876 to assert at least once. 
However, a curiosity still remains: why were there duplicates?

That is the purpose of this investigation.
{code:java}
Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 83.215 s <<< FAILURE! - in 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase
Feb 22 02:47:37 [ERROR] 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test  Time 
elapsed: 50.712 s  <<< FAILURE!
Feb 22 02:47:37 org.opentest4j.AssertionFailedError: 
Feb 22 02:47:37 
Feb 22 02:47:37 expected: 92
Feb 22 02:47:37  but was: 93
Feb 22 02:47:37 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Feb 22 02:47:37 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Feb 22 02:47:37 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Feb 22 02:47:37 at 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133)
Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Feb 22 02:47:37 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Feb 22 02:47:37 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498)
Feb 22 02:47:37 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Feb 22 02:47:37 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Feb 22 02:47:37 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Feb 22 02:47:37 at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
Feb 22 02:47:37 at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
Feb 22 02:47:37 at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
 {code}
[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26305) KinesisFirehoseSinkITCase writes duplicates to Localstack

2022-02-22 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-26305:
--

 Summary: KinesisFirehoseSinkITCase writes duplicates to Localstack
 Key: FLINK-26305
 URL: https://issues.apache.org/jira/browse/FLINK-26305
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.15.0
Reporter: Zichen Liu
Assignee: Zichen Liu


{code:java}
Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 83.215 s <<< FAILURE! - in 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase
Feb 22 02:47:37 [ERROR] 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test  Time 
elapsed: 50.712 s  <<< FAILURE!
Feb 22 02:47:37 org.opentest4j.AssertionFailedError: 
Feb 22 02:47:37 
Feb 22 02:47:37 expected: 92
Feb 22 02:47:37  but was: 93
Feb 22 02:47:37 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Feb 22 02:47:37 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Feb 22 02:47:37 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Feb 22 02:47:37 at 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133)
Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Feb 22 02:47:37 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Feb 22 02:47:37 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498)
Feb 22 02:47:37 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Feb 22 02:47:37 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Feb 22 02:47:37 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Feb 22 02:47:37 at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
Feb 22 02:47:37 at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
Feb 22 02:47:37 at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
 {code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26300) KinesisFirehoseSinkITCase failed on azure IOException

2022-02-22 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-26300:
--

 Summary: KinesisFirehoseSinkITCase failed on azure IOException
 Key: FLINK-26300
 URL: https://issues.apache.org/jira/browse/FLINK-26300
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Affects Versions: 1.15.0
Reporter: Zichen Liu


{code:java}
Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time 
elapsed: 83.215 s <<< FAILURE! - in 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase
Feb 22 02:47:37 [ERROR] 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test  Time 
elapsed: 50.712 s  <<< FAILURE!
Feb 22 02:47:37 org.opentest4j.AssertionFailedError: 
Feb 22 02:47:37 
Feb 22 02:47:37 expected: 92
Feb 22 02:47:37  but was: 93
Feb 22 02:47:37 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
Feb 22 02:47:37 at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
Feb 22 02:47:37 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
Feb 22 02:47:37 at 
org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133)
Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
Feb 22 02:47:37 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
Feb 22 02:47:37 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498)
Feb 22 02:47:37 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
Feb 22 02:47:37 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
Feb 22 02:47:37 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
Feb 22 02:47:37 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
Feb 22 02:47:37 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
Feb 22 02:47:37 at 
org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30)
Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
Feb 22 02:47:37 at 
org.junit.runners.ParentRunner.run(ParentRunner.java:413)
Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
Feb 22 02:47:37 at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
Feb 22 02:47:37 at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
 {code}
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks

2022-02-07 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25977:
--

 Summary: Close sink client and sink http client for KDS/KDF Sinks
 Key: FLINK-25977
 URL: https://issues.apache.org/jira/browse/FLINK-25977
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Update:

DEFAULT_MAX_IN_FLIGHT_REQUESTS=50

to match with the default threads in the AWS SDK v2 HTTP Client default.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs

2022-02-07 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25976:
--

 Summary: Update the KDS and KDF Sink's defaults & update the docs
 Key: FLINK-25976
 URL: https://issues.apache.org/jira/browse/FLINK-25976
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25948) KDS / KDF Sink should call .close() to clean up resources

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25948:
--

 Summary: KDS / KDF Sink should call .close() to clean up resources
 Key: FLINK-25948
 URL: https://issues.apache.org/jira/browse/FLINK-25948
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


Intermittent failures introduced as part of merge (PR#18314: 
[FLINK-24228[connectors/firehose] - Unified Async Sink for Kinesis 
Firehose|https://github.com/apache/flink/pull/18314]):
 # Failures are intermittent and affecting c. 1 in 7 of builds- on 
{{flink-ci.flink}} and {{flink-ci.flink-master-mirror}} .
 # The issue looks identical to the KinesaliteContainer startup issue (Appendix 
1).
 # I have managed to reproduce the issue locally - if I start some parallel 
containers and keep them running - and then run {{KinesisFirehoseSinkITCase}}  
then c. 1 in 6 gives the error.
 # The errors have a slightly different appearance on 
{{flink-ci.flink-master-mirror}} vs {{flink-ci.flink}} which has the same 
appearance as local. I only hope it is a difference in logging/killing 
environment variables. (and that there aren’t 2 distinct issues)

Appendix 1:
{code:java}
org.testcontainers.containers.ContainerLaunchException: Container startup failed

at 
org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336)
at 
org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317)
at 
org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066)
at 
... 11 more
Caused by: org.testcontainers.containers.ContainerLaunchException: Could not 
create/start container
at 
org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525)
at 
org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331)
at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81)
... 12 more
Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result 
with exception
at 
org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54)
at
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25945) Intermittent Failures on KDF AZP 2

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25945:
--

 Summary: Intermittent Failures on KDF AZP 2
 Key: FLINK-25945
 URL: https://issues.apache.org/jira/browse/FLINK-25945
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and 
issue is not reproducible locally. Furthermore the failure is intermittent, 
frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as 
protocol.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25944) Intermittent Failures on KDF AZP

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25944:
--

 Summary: Intermittent Failures on KDF AZP
 Key: FLINK-25944
 URL: https://issues.apache.org/jira/browse/FLINK-25944
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


AsyncSinkWriter implements snapshotState to write the pending request into 
state but none of the implementation (Kinesis, Firehose) provides a state 
serializer nor interacts with the recovered state.

 

* Implement 
{code:java}
getWriterStateSerializer{code}
 for the Kinesis/Firehose Sinks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25943) New Kinesis, Firehose to provide a state serializer

2022-02-03 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25943:
--

 Summary: New Kinesis, Firehose to provide a state serializer
 Key: FLINK-25943
 URL: https://issues.apache.org/jira/browse/FLINK-25943
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

 

*Update:*

^^ Status Update ^^
__List of all work outstanding for 1.15 release__

[Merged] https://github.com/apache/flink/pull/18165 - KDS DataStream Docs
[Merged] https://github.com/apache/flink/pull/18396 - [hotfix] for infinte loop 
if not flushing during commit
[Merged] https://github.com/apache/flink/pull/18421 - Mark Kinesis Producer as 
deprecated (Prod: FLINK-24227)
[Merged] https://github.com/apache/flink/pull/18348 - KDS Table API Sink & Docs
[Merged] https://github.com/apache/flink/pull/18488 - base sink retry entries 
in order not in reverse
[Merged] https://github.com/apache/flink/pull/18512 - changing failed requests 
handler to accept List in AsyncSinkWriter
[Merged] https://github.com/apache/flink/pull/18483 - Do not expose the element 
converter
[Merged] https://github.com/apache/flink/pull/18468 - Adding Kinesis data 
streams sql uber-jar

Ready for review:
[SUCCESS ] https://github.com/apache/flink/pull/18314 - KDF DataStream Sink & 
Docs
[BLOCKED on ^^ ] https://github.com/apache/flink/pull/18426 - rename 
flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module 
names) and KinesisData*Sink to Kinesis*Sink (class names)

Pending PR:
* Firehose Table API Sink & Docs
* KDF Table API SQL jar

TBD:
* FLINK-25846: Not shutting down
* FLINK-25848: Validation during start up
* FLINK-25792: flush() bug
* FLINK-25793: throughput exceeded
* Update the defaults of KDS sink and update the docs + do the same for KDF
* add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the 
`flink-connector-base` and propagate it to the two connectors
- feature freeze
* KDS performance testing
* KDF performance testing
* Clone the new docs to .../contents.zh/... and add the location to the 
corresponding Chinese translation jira - KDS -
* Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs 
(legacy issue)
- Flink 1.15 release
* KDS end to end sanity test - hits aws apis rather than local docker images
* KDS Python wrappers
* FLINK-25733 - Create A migration guide for Kinesis Table API connector - can 
happen after 1.15
* If `endpoint` is provided, `region` should not be required like it currently 
is
* Test if Localstack container requires the 1ms timeout
* Adaptive level of logging (in discussion)

FYI:
* FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - 
https://github.com/apache/flink/pull/18449
* https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink

Chinese translation:
https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink
https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink
https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25924) KDF Integration tests intermittently fails

2022-02-02 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25924:
--

 Summary: KDF Integration tests intermittently fails
 Key: FLINK-25924
 URL: https://issues.apache.org/jira/browse/FLINK-25924
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

 

*Update:*

^^ Status Update ^^
__List of all work outstanding for 1.15 release__

[Merged] https://github.com/apache/flink/pull/18165 - KDS DataStream Docs
[Merged] https://github.com/apache/flink/pull/18396 - [hotfix] for infinte loop 
if not flushing during commit
[Merged] https://github.com/apache/flink/pull/18421 - Mark Kinesis Producer as 
deprecated (Prod: FLINK-24227)
[Merged] https://github.com/apache/flink/pull/18348 - KDS Table API Sink & Docs
[Merged] https://github.com/apache/flink/pull/18488 - base sink retry entries 
in order not in reverse
[Merged] https://github.com/apache/flink/pull/18512 - changing failed requests 
handler to accept List in AsyncSinkWriter
[Merged] https://github.com/apache/flink/pull/18483 - Do not expose the element 
converter
[Merged] https://github.com/apache/flink/pull/18468 - Adding Kinesis data 
streams sql uber-jar

Ready for review:
[SUCCESS ] https://github.com/apache/flink/pull/18314 - KDF DataStream Sink & 
Docs
[BLOCKED on ^^ ] https://github.com/apache/flink/pull/18426 - rename 
flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module 
names) and KinesisData*Sink to Kinesis*Sink (class names)

Pending PR:
* Firehose Table API Sink & Docs
* KDF Table API SQL jar

TBD:
* FLINK-25846: Not shutting down
* FLINK-25848: Validation during start up
* FLINK-25792: flush() bug
* FLINK-25793: throughput exceeded
* Update the defaults of KDS sink and update the docs + do the same for KDF
* add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the 
`flink-connector-base` and propagate it to the two connectors
- feature freeze
* KDS performance testing
* KDF performance testing
* Clone the new docs to .../contents.zh/... and add the location to the 
corresponding Chinese translation jira - KDS -
* Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs 
(legacy issue)
- Flink 1.15 release
* KDS end to end sanity test - hits aws apis rather than local docker images
* KDS Python wrappers
* FLINK-25733 - Create A migration guide for Kinesis Table API connector - can 
happen after 1.15
* If `endpoint` is provided, `region` should not be required like it currently 
is
* Test if Localstack container requires the 1ms timeout
* Adaptive level of logging (in discussion)

FYI:
* FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - 
https://github.com/apache/flink/pull/18449
* https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink

Chinese translation:
https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink
https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink
https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25793) Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits

2022-01-24 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25793:
--

 Summary: Kinesis Data Streams sink is not limiting throughput to 
the destination and therefore exceeding rate limits
 Key: FLINK-25793
 URL: https://issues.apache.org/jira/browse/FLINK-25793
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Bug:

Async Sink Base is too being flushed too frequently resulting in backpressure 
even when buffer is near empty

*Cause:*

During a write(), flushIfAble() is called, which checks if the number of 
buffered elements is greater than a batch size, and if so, insists that the 
sink flushes immediately, even if the number of inFlightRequests is greater 
than the maximum allowed number of inFlightRequests, resulting in a yield of 
the current mailbox thread, and hence blocks.

Notice that this can occur even if the buffer is near empty, so the blocking 
behaviour is unnecessary and undesirable, since we would like the element to be 
written to the buffer and no blocking to occur.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty

2022-01-24 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25792:
--

 Summary: Async Sink Base is too being flushed too frequently 
resulting in backpressure even when buffer is near empty
 Key: FLINK-25792
 URL: https://issues.apache.org/jira/browse/FLINK-25792
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams 
 sink.

*Scope:*
 * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async 
Implementations.
 * Implement a new {{KinesisDynamicTableSink}} that uses 
{{KinesisDataStreamSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataStream}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25737) Chinese Translation - Add documentation for Firehose Async Sink

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25737:
--

 Summary: Chinese Translation - Add documentation for Firehose 
Async Sink
 Key: FLINK-25737
 URL: https://issues.apache.org/jira/browse/FLINK-25737
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Translate:
 * Connectors/Kinesis page to deprecate old sink 
(docs/content/docs/connectors/datastream/kinesis.md)
 * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only 
the section about Kinesis

into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25736) Chinese Translation - Update documentation for Kinesis Table Api

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25736:
--

 Summary: Chinese Translation - Update documentation for Kinesis 
Table Api
 Key: FLINK-25736
 URL: https://issues.apache.org/jira/browse/FLINK-25736
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Translate:
 * Connectors/Kinesis page to deprecate old sink 
(docs/content.zh/docs/connectors/table/kinesis.md)

into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25735:
--

 Summary: Chinese Translation - Add documentation for KDS Async Sink
 Key: FLINK-25735
 URL: https://issues.apache.org/jira/browse/FLINK-25735
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Translate:
 * Connectors/Kinesis page to deprecate old sink 
(docs/content/docs/connectors/datastream/kinesis.md)
 * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only 
the section about Kinesis

into Chinese.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25731:
--

 Summary: Mark FlinkKinesisProducer/FlinkKinesisConsumer as 
deprecated
 Key: FLINK-25731
 URL: https://issues.apache.org/jira/browse/FLINK-25731
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use DynamoDB as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase 
class. The implementation can for now reside in its own module in 
flink-connectors.
 * Implement an asynchornous sink writer for DynamoDB by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x

2022-01-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25729:
--

 Summary: Replace the deprecated FlinkKinesisConsumer with a 
Kinesis Consumer based on the AWS SDK for Java 2.x
 Key: FLINK-25729
 URL: https://issues.apache.org/jira/browse/FLINK-25729
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Common
Affects Versions: 1.15.0
Reporter: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use configure a custom fatal exception handler to 
the AsyncSinkWriter

*Scope:*
 * Create a new fatal exception handler
 * Users of the AsyncSink should be able to create an implementation and pass 
it to the sink



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25692) Documentation for Kinesis Firehose Sink

2022-01-18 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25692:
--

 Summary: Documentation for Kinesis Firehose Sink
 Key: FLINK-25692
 URL: https://issues.apache.org/jira/browse/FLINK-25692
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Reporter: Zichen Liu
Assignee: Ahmed Hamdy
 Fix For: 1.15.0


h2. Motivation

*User stories:*
As a Flink user, I’d like to use the Table API for the new Kinesis Firehose 
async sink.

*Scope:*
* Add a new module for {{flink-connector-kinesis-data-firehose-table}}. 
 * Implement a new {{KinesisFirehoseDynamicTableSink}} that uses 
{{KinesisDataFirehoseSink}} Async Implementation and implements 
{{{}AsyncDynamicTableSink{}}}.
 * The implementation introduces Async Sink configurations as optional options 
in the table definition, with default values derived from the 
{{KinesisDataFirehose}} default values.
 * Unit/Integration testing. modify KinesisTableAPI tests for the new 
implementation, add unit tests for {{AsyncDynamicTableSink}} and 
{{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}.
 * Java / code-level docs.
* Add documentations for Table Api connector for Kinesis data firehose.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25408) Chinese Translation - Add documentation for KDS Async Sink

2021-12-21 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-25408:
--

 Summary: Chinese Translation - Add documentation for KDS Async Sink
 Key: FLINK-25408
 URL: https://issues.apache.org/jira/browse/FLINK-25408
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis, Documentation
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

_FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes the 
existing one based on KPL._

*Scope:*
 * Deprecate the current section in the docs for the Kinesis KPL sink and write 
documentation and usage guide for the new sink.

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24905) KDS implementation of Async Sink Table API

2021-11-15 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24905:
--

 Summary: KDS implementation of Async Sink Table API
 Key: FLINK-24905
 URL: https://issues.apache.org/jira/browse/FLINK-24905
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24904) Add documentation for KDS Async Sink

2021-11-15 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24904:
--

 Summary: Add documentation for KDS Async Sink
 Key: FLINK-24904
 URL: https://issues.apache.org/jira/browse/FLINK-24904
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24602) Include diffs for configuration documentation page

2021-10-20 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24602:
--

 Summary: Include diffs for configuration documentation page
 Key: FLINK-24602
 URL: https://issues.apache.org/jira/browse/FLINK-24602
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Reporter: Zichen Liu


We could include it as an extra column on this page:

[https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/]

Or have another page dedicated to diffs on what the default configuration for 
each key has been throughout historical versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase

2021-09-24 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24370:
--

 Summary: [FLIP-171] Documentation for Generic AsyncSinkBase
 Key: FLINK-24370
 URL: https://issues.apache.org/jira/browse/FLINK-24370
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation

2021-09-14 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24278:
--

 Summary: [FLIP-171] Async Sink Base Sink Developer Guide for 
Documentation
 Key: FLINK-24278
 URL: https://issues.apache.org/jira/browse/FLINK-24278
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


*User stories*
 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

*Scope*
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24234:
--

 Summary: [FLIP-171] Byte Based & Time Based Flushing for 
AsyncSinkBase
 Key: FLINK-24234
 URL: https://issues.apache.org/jira/browse/FLINK-24234
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24229:
--

 Summary: [FLIP-171] DynamoDB implementation of Async Sink
 Key: FLINK-24229
 URL: https://issues.apache.org/jira/browse/FLINK-24229
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Firehose by inheriting the 
AsyncSinkBase class. The implementation can for now reside in its own module in 
flink-connectors. The module and package name can be anything reasonable e.g. 
{{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for Firehose by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to Firehose for increased throughput. The implemented Sink Writer will be used 
by the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24228:
--

 Summary: [FLIP-171] Firehose implementation of Async Sink
 Key: FLINK-24228
 URL: https://issues.apache.org/jira/browse/FLINK-24228
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

*User stories:*
 As a Flink user, I’d like to use Kinesis Data Streams as sink for my data 
pipeline.

*Scope:*
 * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting 
the AsyncSinkBase class. The implementation can for now reside in its own 
module in flink-connectors. The module and package name can be anything 
reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and 
{{org.apache.flink.connector.aws.kinesis}} for the package name.
 * The implementation must use [the Kinesis Java 
Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html].
 * The implementation must allow users to configure the Kinesis Client, with 
reasonable default settings.
 * Implement an asynchornous sink writer for KDS by extending the 
AsyncSinkWriter. The implementation must deal with failed requests and retry 
them using the {{requeueFailedRequestEntry}} method. If possible, the 
implementation should batch multiple requests (PutRecordsRequestEntry objects) 
to KDS for increased throughput. The implemented Sink Writer will be used by 
the Sink class that will be created as part of this story.
 * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We 
already use this in {{KinesisTableApiITCase}}.
 * Java / code-level docs.
 * End to end testing: add tests that hits a real AWS instance. (How to best 
donate resources to the Flink project to allow this to happen?)

h2. References

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink

2021-09-09 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24227:
--

 Summary: [FLIP-171] KDS implementation of Async Sink
 Key: FLINK-24227
 URL: https://issues.apache.org/jira/browse/FLINK-24227
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu
Assignee: Zichen Liu
 Fix For: 1.15.0


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24041) [FLIP-171] Generic AsyncSinkBase

2021-08-29 Thread Zichen Liu (Jira)
Zichen Liu created FLINK-24041:
--

 Summary: [FLIP-171] Generic AsyncSinkBase
 Key: FLINK-24041
 URL: https://issues.apache.org/jira/browse/FLINK-24041
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Common
Reporter: Zichen Liu


h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)