[jira] [Created] (FLINK-28148) Unable to load jar connector to a Python Table API app
Zichen Liu created FLINK-28148: -- Summary: Unable to load jar connector to a Python Table API app Key: FLINK-28148 URL: https://issues.apache.org/jira/browse/FLINK-28148 Project: Flink Issue Type: Bug Components: API / Python, Connectors / Common, Table SQL / API Affects Versions: 1.16.0 Reporter: Zichen Liu Reproduction steps: # Clone the latest Flink from the master branch. # Follow the Flink [recommended steps](https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/) to build Flink & install PyFlink. Notes: Tutorial recommended Maven 3.2.x, Python 3.6-3.9, actual: Maven 3.2.5, Python 3.7. # Create a new Python Table API app that loads in a jar, similar to: {code:java} from pyflink.table import TableEnvironment, StreamTableEnvironment, EnvironmentSettings env_settings = EnvironmentSettings.in_streaming_mode() t_env = StreamTableEnvironment.create(environment_settings=env_settings) t_env.get_config().set("pipeline.classpaths", "file:///path/to/your/jar.jar") {code} The jar loaded here can be any jar, and the following message will appear: {code:java} Traceback (most recent call last): File "pyflink_table_api_firehose.py", line 48, in log_processing() File "pyflink_table_api_firehose.py", line 14, in log_processing t_env.get_config().set("pipeline.classpaths", "file:///home/YOUR_USER/pyflink-table-api/flink/flink-connectors/flink-sql-connector-aws-kinesis-firehose/target/flink-sql-connector-aws-kinesis-firehose-1.16-SNAPSHOT.jar") File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/table/table_config.py", line 109, in set add_jars_to_context_class_loader(value.split(";")) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/java_utils.py", line 169, in add_jars_to_context_class_loader addURL.invoke(loader, to_jarray(get_gateway().jvm.Object, [url])) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146, in deco return f(*a, **kw) File "/home/YOUR_USER/.local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o45.invoke. : java.lang.IllegalArgumentException: object is not an instance of declaring class at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) {code} Reproduced on Mac and Amazon Linux 2. Next do: {code:java} pip uninstall apache-flink pip install apache-flink{code} To downgrade it to 1.15 release. The loading of the jar should be successful. Even if you try to load the same connector built from master (reproduced with Kafka, Kinesis Firehose). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28027) Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy
Zichen Liu created FLINK-28027: -- Summary: Initialise Async Sink maximum number of in flight messages to low number for rate limiting strategy Key: FLINK-28027 URL: https://issues.apache.org/jira/browse/FLINK-28027 Project: Flink Issue Type: Bug Components: Connectors / Common, Connectors / Kinesis Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.16.0 *Background* In the AsyncSinkWriter, we implement a rate limiting strategy. The initial value for the maximum number of in flight messages is set extremely high ({{{}maxBatchSize * maxInFlightRequests{}}}). However, in accordance with the AIMD strategy, the TCP implementation for congestion control has found a small value to start with [is better]([https://en.wikipedia.org/wiki/TCP_congestion_control#Slow_start]). *Suggestion* A better default might be: * maxBatchSize * maxBatchSize / parallelism -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-28007) Tests for AWS Connectors Using SDK v2 to use Synchronous Clients
Zichen Liu created FLINK-28007: -- Summary: Tests for AWS Connectors Using SDK v2 to use Synchronous Clients Key: FLINK-28007 URL: https://issues.apache.org/jira/browse/FLINK-28007 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.16.0 h3. Background AWS SDK v2 async clients use a Netty async client for Kinesis Data Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a shared thread pool for Netty to use for network operations when one is not configured. The thread pool is managed by a shared ELG (event loop group), and this is stored in a static field. We do not configure this for the AWS connectors in the Flink codebase. When threads are spawned within the ELG, they inherit the context classloader from the current thread. If the ELG is created from a shared classloader, for instance Flink parent classloader, or MiniCluster parent classloader, multiple Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink job, it will inherit the Flink user classloader. When this job completes/fails, the classloader is destroyed, however the Netty thread is still referencing it, and this leads to below exception. h3. Impact This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via the Flink User Classloader. It is expected this is the standard deployment configuration. This issue is known to impact: - Flink mini cluster, for example in integration tests (FLINK-26064) - Flink cluster loading AWS SDK v2 via parent classloader h3. Suggested solution There are a few possible solutions, as discussed https://github.com/apache/flink/pull/18733 1. Create a separate ELG per Flink job 2. Create a separate ELG per subtask 3. Attach the correct classloader to ELG spawned threads h3. Error Stack (shortened stack trace, as full is too large) {noformat} Feb 09 20:05:04 java.util.concurrent.ExecutionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) Feb 09 20:05:04 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908) (...) Feb 09 20:05:04 Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98) Feb 09 20:05:04 at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179) Feb 09 20:05:04 at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159) (...) Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'. Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164) Feb 09 20:05:04 at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188) Feb 09 20:05:04 at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348) Feb 09 20:05:04 at
[jira] [Created] (FLINK-27670) Python wrappers for Kinesis Sinks
Zichen Liu created FLINK-27670: -- Summary: Python wrappers for Kinesis Sinks Key: FLINK-27670 URL: https://issues.apache.org/jira/browse/FLINK-27670 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Zichen Liu Fix For: 1.15.1 Create Python Wrappers for the new Kinesis Streams sink and the Kinesis Firehose sink. An example implementation may be found here [https://github.com/apache/flink/pull/15491/files] for the old Kinesis sink. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27537) Remove requirement for Async Sink's RequestEntryT to be serializable
Zichen Liu created FLINK-27537: -- Summary: Remove requirement for Async Sink's RequestEntryT to be serializable Key: FLINK-27537 URL: https://issues.apache.org/jira/browse/FLINK-27537 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Currently, in `AsyncSinkBase` and it's dependent classes, e.g. the sink writer, element converter etc., the `RequestEntryT` generic type is required to be `serializable`. However, this requirement no longer holds and there is nothing that actually requires this. Proposed approach: * Remove the `extends serializable` from the generic type `RequestEntryT` -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27536) Rename method parameter in AsyncSinkWriter
Zichen Liu created FLINK-27536: -- Summary: Rename method parameter in AsyncSinkWriter Key: FLINK-27536 URL: https://issues.apache.org/jira/browse/FLINK-27536 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Change the abstract method's parameter naming in AsyncSinkWriter. From Consumer> requestResult to Consumer> requestToRetry or something similar. This is because the consumer here is supposed to accept a list of requests that need to be retried. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27505) Add javadoc comments to AsyncSinkBase
Zichen Liu created FLINK-27505: -- Summary: Add javadoc comments to AsyncSinkBase Key: FLINK-27505 URL: https://issues.apache.org/jira/browse/FLINK-27505 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Currently the javadocs describing each of the parameters on the constructor for AsyncSinkBase exist in AsyncSinkBaseBuilder. Since we are not enforcing the use of the builder, it makes more sense to have these descriptions in the AsyncSinkBase. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-26589) Update the logging level of Kinesis Streams and Firehose sinks
Zichen Liu created FLINK-26589: -- Summary: Update the logging level of Kinesis Streams and Firehose sinks Key: FLINK-26589 URL: https://issues.apache.org/jira/browse/FLINK-26589 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Bug: The Async Sink Base sink is not limiting throughput to the destination and therefore exceeding rate limits *Cause:* We are not throttling our requests downstream at all. We should monitor for requests that have failed with ThroughputExceeded exceptions and reduce the throughput accordingly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26309) Add a polling strategy to determine whether Localstack test container has started
Zichen Liu created FLINK-26309: -- Summary: Add a polling strategy to determine whether Localstack test container has started Key: FLINK-26309 URL: https://issues.apache.org/jira/browse/FLINK-26309 Project: Flink Issue Type: Technical Debt Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu The firehose sink is an at least once sink. But we only expect there to be duplicates during failures and reload from save/checkpoints. During `KinesisFirehoseSinkITCase` there is no such action, and yet, we occasionally get duplicates in the test result. The test was originally asserting exactly once erroneously and this has been fixed in #18876 to assert at least once. However, a curiosity still remains: why were there duplicates? That is the purpose of this investigation. {code:java} Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 83.215 s <<< FAILURE! - in org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase Feb 22 02:47:37 [ERROR] org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test Time elapsed: 50.712 s <<< FAILURE! Feb 22 02:47:37 org.opentest4j.AssertionFailedError: Feb 22 02:47:37 Feb 22 02:47:37 expected: 92 Feb 22 02:47:37 but was: 93 Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) Feb 22 02:47:37 at org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Feb 22 02:47:37 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Feb 22 02:47:37 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Feb 22 02:47:37 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Feb 22 02:47:37 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Feb 22 02:47:37 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Feb 22 02:47:37 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Feb 22 02:47:37 at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Feb 22 02:47:37 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Feb 22 02:47:37 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) {code} [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26305) KinesisFirehoseSinkITCase writes duplicates to Localstack
Zichen Liu created FLINK-26305: -- Summary: KinesisFirehoseSinkITCase writes duplicates to Localstack Key: FLINK-26305 URL: https://issues.apache.org/jira/browse/FLINK-26305 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.15.0 Reporter: Zichen Liu Assignee: Zichen Liu {code:java} Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 83.215 s <<< FAILURE! - in org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase Feb 22 02:47:37 [ERROR] org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test Time elapsed: 50.712 s <<< FAILURE! Feb 22 02:47:37 org.opentest4j.AssertionFailedError: Feb 22 02:47:37 Feb 22 02:47:37 expected: 92 Feb 22 02:47:37 but was: 93 Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) Feb 22 02:47:37 at org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Feb 22 02:47:37 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Feb 22 02:47:37 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Feb 22 02:47:37 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Feb 22 02:47:37 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Feb 22 02:47:37 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Feb 22 02:47:37 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Feb 22 02:47:37 at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Feb 22 02:47:37 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Feb 22 02:47:37 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26300) KinesisFirehoseSinkITCase failed on azure IOException
Zichen Liu created FLINK-26300: -- Summary: KinesisFirehoseSinkITCase failed on azure IOException Key: FLINK-26300 URL: https://issues.apache.org/jira/browse/FLINK-26300 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.15.0 Reporter: Zichen Liu {code:java} Feb 22 02:47:37 [ERROR] Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 83.215 s <<< FAILURE! - in org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase Feb 22 02:47:37 [ERROR] org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test Time elapsed: 50.712 s <<< FAILURE! Feb 22 02:47:37 org.opentest4j.AssertionFailedError: Feb 22 02:47:37 Feb 22 02:47:37 expected: 92 Feb 22 02:47:37 but was: 93 Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) Feb 22 02:47:37 at org.apache.flink.connector.firehose.sink.KinesisFirehoseSinkITCase.test(KinesisFirehoseSinkITCase.java:133) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) Feb 22 02:47:37 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) Feb 22 02:47:37 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) Feb 22 02:47:37 at java.lang.reflect.Method.invoke(Method.java:498) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) Feb 22 02:47:37 at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) Feb 22 02:47:37 at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) Feb 22 02:47:37 at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) Feb 22 02:47:37 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) Feb 22 02:47:37 at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) Feb 22 02:47:37 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) Feb 22 02:47:37 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) Feb 22 02:47:37 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) Feb 22 02:47:37 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) Feb 22 02:47:37 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) Feb 22 02:47:37 at org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:30) Feb 22 02:47:37 at org.junit.rules.RunRules.evaluate(RunRules.java:20) Feb 22 02:47:37 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) Feb 22 02:47:37 at org.junit.runners.ParentRunner.run(ParentRunner.java:413) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:137) Feb 22 02:47:37 at org.junit.runner.JUnitCore.run(JUnitCore.java:115) Feb 22 02:47:37 at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) Feb 22 02:47:37 at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) {code} https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=31983=logs=d44f43ce-542c-597d-bf94-b0718c71e5e8=ed165f3f-d0f6-524b-5279-86f8ee7d0e2d=44249 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25977) Close sink client and sink http client for KDS/KDF Sinks
Zichen Liu created FLINK-25977: -- Summary: Close sink client and sink http client for KDS/KDF Sinks Key: FLINK-25977 URL: https://issues.apache.org/jira/browse/FLINK-25977 Project: Flink Issue Type: Improvement Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Update: DEFAULT_MAX_IN_FLIGHT_REQUESTS=50 to match with the default threads in the AWS SDK v2 HTTP Client default. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25976) Update the KDS and KDF Sink's defaults & update the docs
Zichen Liu created FLINK-25976: -- Summary: Update the KDS and KDF Sink's defaults & update the docs Key: FLINK-25976 URL: https://issues.apache.org/jira/browse/FLINK-25976 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25948) KDS / KDF Sink should call .close() to clean up resources
Zichen Liu created FLINK-25948: -- Summary: KDS / KDF Sink should call .close() to clean up resources Key: FLINK-25948 URL: https://issues.apache.org/jira/browse/FLINK-25948 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 Intermittent failures introduced as part of merge (PR#18314: [FLINK-24228[connectors/firehose] - Unified Async Sink for Kinesis Firehose|https://github.com/apache/flink/pull/18314]): # Failures are intermittent and affecting c. 1 in 7 of builds- on {{flink-ci.flink}} and {{flink-ci.flink-master-mirror}} . # The issue looks identical to the KinesaliteContainer startup issue (Appendix 1). # I have managed to reproduce the issue locally - if I start some parallel containers and keep them running - and then run {{KinesisFirehoseSinkITCase}} then c. 1 in 6 gives the error. # The errors have a slightly different appearance on {{flink-ci.flink-master-mirror}} vs {{flink-ci.flink}} which has the same appearance as local. I only hope it is a difference in logging/killing environment variables. (and that there aren’t 2 distinct issues) Appendix 1: {code:java} org.testcontainers.containers.ContainerLaunchException: Container startup failed at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:336) at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:317) at org.testcontainers.containers.GenericContainer.starting(GenericContainer.java:1066) at ... 11 more Caused by: org.testcontainers.containers.ContainerLaunchException: Could not create/start container at org.testcontainers.containers.GenericContainer.tryStart(GenericContainer.java:525) at org.testcontainers.containers.GenericContainer.lambda$doStart$0(GenericContainer.java:331) at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:81) ... 12 more Caused by: org.rnorth.ducttape.TimeoutException: Timeout waiting for result with exception at org.rnorth.ducttape.unreliables.Unreliables.retryUntilSuccess(Unreliables.java:54) at {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25945) Intermittent Failures on KDF AZP 2
Zichen Liu created FLINK-25945: -- Summary: Intermittent Failures on KDF AZP 2 Key: FLINK-25945 URL: https://issues.apache.org/jira/browse/FLINK-25945 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 Problem: ci fails on [#18553|https://github.com/apache/flink/pull/18553] and issue is not reproducible locally. Furthermore the failure is intermittent, frequency c. 1 in 5(?). Resolution: unknown - theory is to use HTTP1_1 as protocol. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25944) Intermittent Failures on KDF AZP
Zichen Liu created FLINK-25944: -- Summary: Intermittent Failures on KDF AZP Key: FLINK-25944 URL: https://issues.apache.org/jira/browse/FLINK-25944 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 AsyncSinkWriter implements snapshotState to write the pending request into state but none of the implementation (Kinesis, Firehose) provides a state serializer nor interacts with the recovered state. * Implement {code:java} getWriterStateSerializer{code} for the Kinesis/Firehose Sinks -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25943) New Kinesis, Firehose to provide a state serializer
Zichen Liu created FLINK-25943: -- Summary: New Kinesis, Firehose to provide a state serializer Key: FLINK-25943 URL: https://issues.apache.org/jira/browse/FLINK-25943 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async Implementations. * Implement a new {{KinesisDynamicTableSink}} that uses {{KinesisDataStreamSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataStream}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] *Update:* ^^ Status Update ^^ __List of all work outstanding for 1.15 release__ [Merged] https://github.com/apache/flink/pull/18165 - KDS DataStream Docs [Merged] https://github.com/apache/flink/pull/18396 - [hotfix] for infinte loop if not flushing during commit [Merged] https://github.com/apache/flink/pull/18421 - Mark Kinesis Producer as deprecated (Prod: FLINK-24227) [Merged] https://github.com/apache/flink/pull/18348 - KDS Table API Sink & Docs [Merged] https://github.com/apache/flink/pull/18488 - base sink retry entries in order not in reverse [Merged] https://github.com/apache/flink/pull/18512 - changing failed requests handler to accept List in AsyncSinkWriter [Merged] https://github.com/apache/flink/pull/18483 - Do not expose the element converter [Merged] https://github.com/apache/flink/pull/18468 - Adding Kinesis data streams sql uber-jar Ready for review: [SUCCESS ] https://github.com/apache/flink/pull/18314 - KDF DataStream Sink & Docs [BLOCKED on ^^ ] https://github.com/apache/flink/pull/18426 - rename flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module names) and KinesisData*Sink to Kinesis*Sink (class names) Pending PR: * Firehose Table API Sink & Docs * KDF Table API SQL jar TBD: * FLINK-25846: Not shutting down * FLINK-25848: Validation during start up * FLINK-25792: flush() bug * FLINK-25793: throughput exceeded * Update the defaults of KDS sink and update the docs + do the same for KDF * add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the `flink-connector-base` and propagate it to the two connectors - feature freeze * KDS performance testing * KDF performance testing * Clone the new docs to .../contents.zh/... and add the location to the corresponding Chinese translation jira - KDS - * Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs (legacy issue) - Flink 1.15 release * KDS end to end sanity test - hits aws apis rather than local docker images * KDS Python wrappers * FLINK-25733 - Create A migration guide for Kinesis Table API connector - can happen after 1.15 * If `endpoint` is provided, `region` should not be required like it currently is * Test if Localstack container requires the 1ms timeout * Adaptive level of logging (in discussion) FYI: * FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - https://github.com/apache/flink/pull/18449 * https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink Chinese translation: https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25924) KDF Integration tests intermittently fails
Zichen Liu created FLINK-25924: -- Summary: KDF Integration tests intermittently fails Key: FLINK-25924 URL: https://issues.apache.org/jira/browse/FLINK-25924 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async Implementations. * Implement a new {{KinesisDynamicTableSink}} that uses {{KinesisDataStreamSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataStream}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] *Update:* ^^ Status Update ^^ __List of all work outstanding for 1.15 release__ [Merged] https://github.com/apache/flink/pull/18165 - KDS DataStream Docs [Merged] https://github.com/apache/flink/pull/18396 - [hotfix] for infinte loop if not flushing during commit [Merged] https://github.com/apache/flink/pull/18421 - Mark Kinesis Producer as deprecated (Prod: FLINK-24227) [Merged] https://github.com/apache/flink/pull/18348 - KDS Table API Sink & Docs [Merged] https://github.com/apache/flink/pull/18488 - base sink retry entries in order not in reverse [Merged] https://github.com/apache/flink/pull/18512 - changing failed requests handler to accept List in AsyncSinkWriter [Merged] https://github.com/apache/flink/pull/18483 - Do not expose the element converter [Merged] https://github.com/apache/flink/pull/18468 - Adding Kinesis data streams sql uber-jar Ready for review: [SUCCESS ] https://github.com/apache/flink/pull/18314 - KDF DataStream Sink & Docs [BLOCKED on ^^ ] https://github.com/apache/flink/pull/18426 - rename flink-connector-aws-kinesis-data-* to flink-connector-aws-kinesis-* (module names) and KinesisData*Sink to Kinesis*Sink (class names) Pending PR: * Firehose Table API Sink & Docs * KDF Table API SQL jar TBD: * FLINK-25846: Not shutting down * FLINK-25848: Validation during start up * FLINK-25792: flush() bug * FLINK-25793: throughput exceeded * Update the defaults of KDS sink and update the docs + do the same for KDF * add a `AsyncSinkCommonConfig` class (to wrap the 6 params) to the `flink-connector-base` and propagate it to the two connectors - feature freeze * KDS performance testing * KDF performance testing * Clone the new docs to .../contents.zh/... and add the location to the corresponding Chinese translation jira - KDS - * Rename [Amazon AWS Kinesis Streams] to [Amazon Kinesis Data Streams] in docs (legacy issue) - Flink 1.15 release * KDS end to end sanity test - hits aws apis rather than local docker images * KDS Python wrappers * FLINK-25733 - Create A migration guide for Kinesis Table API connector - can happen after 1.15 * If `endpoint` is provided, `region` should not be required like it currently is * Test if Localstack container requires the 1ms timeout * Adaptive level of logging (in discussion) FYI: * FLINK-25661 - Add Custom Fatal Exception handler in AsyncSinkWriter - https://github.com/apache/flink/pull/18449 * https://issues.apache.org/jira/browse/FLINK-24229 DDB Sink Chinese translation: https://issues.apache.org/jira/browse/FLINK-25735 - KDS DataStream Sink https://issues.apache.org/jira/browse/FLINK-25736 - KDS Table API Sink https://issues.apache.org/jira/browse/FLINK-25737 - KDF DataStream Sink -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25793) Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits
Zichen Liu created FLINK-25793: -- Summary: Kinesis Data Streams sink is not limiting throughput to the destination and therefore exceeding rate limits Key: FLINK-25793 URL: https://issues.apache.org/jira/browse/FLINK-25793 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Bug: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty *Cause:* During a write(), flushIfAble() is called, which checks if the number of buffered elements is greater than a batch size, and if so, insists that the sink flushes immediately, even if the number of inFlightRequests is greater than the maximum allowed number of inFlightRequests, resulting in a yield of the current mailbox thread, and hence blocks. Notice that this can occur even if the buffer is near empty, so the blocking behaviour is unnecessary and undesirable, since we would like the element to be written to the buffer and no blocking to occur. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25792) Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty
Zichen Liu created FLINK-25792: -- Summary: Async Sink Base is too being flushed too frequently resulting in backpressure even when buffer is near empty Key: FLINK-25792 URL: https://issues.apache.org/jira/browse/FLINK-25792 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Data Streams sink. *Scope:* * Introduce {{AsyncDynamicTableSink}} that enables Sinking Tables into Async Implementations. * Implement a new {{KinesisDynamicTableSink}} that uses {{KinesisDataStreamSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataStream}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25737) Chinese Translation - Add documentation for Firehose Async Sink
Zichen Liu created FLINK-25737: -- Summary: Chinese Translation - Add documentation for Firehose Async Sink Key: FLINK-25737 URL: https://issues.apache.org/jira/browse/FLINK-25737 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Translate: * Connectors/Kinesis page to deprecate old sink (docs/content/docs/connectors/datastream/kinesis.md) * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only the section about Kinesis into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25736) Chinese Translation - Update documentation for Kinesis Table Api
Zichen Liu created FLINK-25736: -- Summary: Chinese Translation - Update documentation for Kinesis Table Api Key: FLINK-25736 URL: https://issues.apache.org/jira/browse/FLINK-25736 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Translate: * Connectors/Kinesis page to deprecate old sink (docs/content.zh/docs/connectors/table/kinesis.md) into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25735) Chinese Translation - Add documentation for KDS Async Sink
Zichen Liu created FLINK-25735: -- Summary: Chinese Translation - Add documentation for KDS Async Sink Key: FLINK-25735 URL: https://issues.apache.org/jira/browse/FLINK-25735 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Translate: * Connectors/Kinesis page to deprecate old sink (docs/content/docs/connectors/datastream/kinesis.md) * Metrics page with new sink metrics (docs/content/docs/ops/metrics.md) - only the section about Kinesis into Chinese. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25731) Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated
Zichen Liu created FLINK-25731: -- Summary: Mark FlinkKinesisProducer/FlinkKinesisConsumer as deprecated Key: FLINK-25731 URL: https://issues.apache.org/jira/browse/FLINK-25731 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use DynamoDB as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for DynamoDB by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. * Implement an asynchornous sink writer for DynamoDB by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25729) Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x
Zichen Liu created FLINK-25729: -- Summary: Replace the deprecated FlinkKinesisConsumer with a Kinesis Consumer based on the AWS SDK for Java 2.x Key: FLINK-25729 URL: https://issues.apache.org/jira/browse/FLINK-25729 Project: Flink Issue Type: Improvement Components: Connectors / Common Affects Versions: 1.15.0 Reporter: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use configure a custom fatal exception handler to the AsyncSinkWriter *Scope:* * Create a new fatal exception handler * Users of the AsyncSink should be able to create an implementation and pass it to the sink -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25692) Documentation for Kinesis Firehose Sink
Zichen Liu created FLINK-25692: -- Summary: Documentation for Kinesis Firehose Sink Key: FLINK-25692 URL: https://issues.apache.org/jira/browse/FLINK-25692 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis Reporter: Zichen Liu Assignee: Ahmed Hamdy Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use the Table API for the new Kinesis Firehose async sink. *Scope:* * Add a new module for {{flink-connector-kinesis-data-firehose-table}}. * Implement a new {{KinesisFirehoseDynamicTableSink}} that uses {{KinesisDataFirehoseSink}} Async Implementation and implements {{{}AsyncDynamicTableSink{}}}. * The implementation introduces Async Sink configurations as optional options in the table definition, with default values derived from the {{KinesisDataFirehose}} default values. * Unit/Integration testing. modify KinesisTableAPI tests for the new implementation, add unit tests for {{AsyncDynamicTableSink}} and {{KinesisDynamicTableSink}} and {{{}KinesisDynamicTableSinkFactory{}}}. * Java / code-level docs. * Add documentations for Table Api connector for Kinesis data firehose. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25408) Chinese Translation - Add documentation for KDS Async Sink
Zichen Liu created FLINK-25408: -- Summary: Chinese Translation - Add documentation for KDS Async Sink Key: FLINK-25408 URL: https://issues.apache.org/jira/browse/FLINK-25408 Project: Flink Issue Type: New Feature Components: Connectors / Kinesis, Documentation Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation _FLINK-24227 introduces a new sink for Kinesis Data Streams that supersedes the existing one based on KPL._ *Scope:* * Deprecate the current section in the docs for the Kinesis KPL sink and write documentation and usage guide for the new sink. h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24905) KDS implementation of Async Sink Table API
Zichen Liu created FLINK-24905: -- Summary: KDS implementation of Async Sink Table API Key: FLINK-24905 URL: https://issues.apache.org/jira/browse/FLINK-24905 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24904) Add documentation for KDS Async Sink
Zichen Liu created FLINK-24904: -- Summary: Add documentation for KDS Async Sink Key: FLINK-24904 URL: https://issues.apache.org/jira/browse/FLINK-24904 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24602) Include diffs for configuration documentation page
Zichen Liu created FLINK-24602: -- Summary: Include diffs for configuration documentation page Key: FLINK-24602 URL: https://issues.apache.org/jira/browse/FLINK-24602 Project: Flink Issue Type: Improvement Components: Documentation Reporter: Zichen Liu We could include it as an extra column on this page: [https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/] Or have another page dedicated to diffs on what the default configuration for each key has been throughout historical versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24370) [FLIP-171] Documentation for Generic AsyncSinkBase
Zichen Liu created FLINK-24370: -- Summary: [FLIP-171] Documentation for Generic AsyncSinkBase Key: FLINK-24370 URL: https://issues.apache.org/jira/browse/FLINK-24370 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24278) [FLIP-171] Async Sink Base Sink Developer Guide for Documentation
Zichen Liu created FLINK-24278: -- Summary: [FLIP-171] Async Sink Base Sink Developer Guide for Documentation Key: FLINK-24278 URL: https://issues.apache.org/jira/browse/FLINK-24278 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 *User stories* * As a Sink user, I’d like to configure the batch size for items to send to the destination at once (e.g. “flush if there are x number of items in the batch”) * As a Sink user, I’d like to configure the batching logic so that I can flush the batch of requests based on time period (e.g. “flush every 2 seconds”) * As a Sink user I’d like to specify the number of bytes for the batch of requests to be flushed (e.g. ”submit the batch after the total number of bytes in it is above 1KB”) * As a Sink developer, I’d like to use the configuration mechanism provided to allow Sink users to configure my Sink implementation * *Scope* * Allow Sink developers and users to pass batch size config to the AsyncSinkWriter * Add support for time-based flushing (e.g. “flush after x miliseconds”) using the ProcessingTimeService which is part of the Sink interface * Add support for byte-based flushing * Consider the combination of time-based flushing and byte-based flushing, if there are more bytes than configured in the time-based batch, then the last few (however many necessary) items should go in the next batch to satisfy the requirement for the number of bytes. *References* More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24234) [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase
Zichen Liu created FLINK-24234: -- Summary: [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase Key: FLINK-24234 URL: https://issues.apache.org/jira/browse/FLINK-24234 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24229) [FLIP-171] DynamoDB implementation of Async Sink
Zichen Liu created FLINK-24229: -- Summary: [FLIP-171] DynamoDB implementation of Async Sink Key: FLINK-24229 URL: https://issues.apache.org/jira/browse/FLINK-24229 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Firehose as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Firehose by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for Firehose by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to Firehose for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24228) [FLIP-171] Firehose implementation of Async Sink
Zichen Liu created FLINK-24228: -- Summary: [FLIP-171] Firehose implementation of Async Sink Key: FLINK-24228 URL: https://issues.apache.org/jira/browse/FLINK-24228 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation *User stories:* As a Flink user, I’d like to use Kinesis Data Streams as sink for my data pipeline. *Scope:* * Implement an asynchronous sink for Kinesis Data Streams (KDS) by inheriting the AsyncSinkBase class. The implementation can for now reside in its own module in flink-connectors. The module and package name can be anything reasonable e.g. {{flink-connector-aws-kinesis}} for the module name and {{org.apache.flink.connector.aws.kinesis}} for the package name. * The implementation must use [the Kinesis Java Client|https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/AmazonKinesisClient.html]. * The implementation must allow users to configure the Kinesis Client, with reasonable default settings. * Implement an asynchornous sink writer for KDS by extending the AsyncSinkWriter. The implementation must deal with failed requests and retry them using the {{requeueFailedRequestEntry}} method. If possible, the implementation should batch multiple requests (PutRecordsRequestEntry objects) to KDS for increased throughput. The implemented Sink Writer will be used by the Sink class that will be created as part of this story. * Unit/Integration testing. Use Kinesalite (in-memory Kinesis simulation). We already use this in {{KinesisTableApiITCase}}. * Java / code-level docs. * End to end testing: add tests that hits a real AWS instance. (How to best donate resources to the Flink project to allow this to happen?) h2. References More details to be found [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24227) [FLIP-171] KDS implementation of Async Sink
Zichen Liu created FLINK-24227: -- Summary: [FLIP-171] KDS implementation of Async Sink Key: FLINK-24227 URL: https://issues.apache.org/jira/browse/FLINK-24227 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu Assignee: Zichen Liu Fix For: 1.15.0 h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24041) [FLIP-171] Generic AsyncSinkBase
Zichen Liu created FLINK-24041: -- Summary: [FLIP-171] Generic AsyncSinkBase Key: FLINK-24041 URL: https://issues.apache.org/jira/browse/FLINK-24041 Project: Flink Issue Type: New Feature Components: Connectors / Common Reporter: Zichen Liu h2. Motivation Apache Flink has a rich connector ecosystem that can persist data in various destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, HBase, and many more destinations. Additional connectors are maintained in Apache Bahir or directly on GitHub. The basic functionality of these sinks is quite similar. They batch events according to user defined buffering hints, sign requests and send them to the respective endpoint, retry unsuccessful or throttled requests, and participate in checkpointing. They primarily just differ in the way they interface with the destination. Yet, all the above-mentioned sinks are developed and maintained independently. We hence propose to create a sink that abstracts away this common functionality into a generic sink. Adding support for a new destination then just means creating a lightweight shim that only implements the specific interfaces of the destination using a client that supports async requests. Having a common abstraction will reduce the effort required to maintain all these individual sinks. It will also make it much easier and faster to create integrations with additional destinations. Moreover, improvements or bug fixes to the core of the sink will benefit all implementations that are based on it. The design of the sink focusses on extensibility and a broad support of destinations. The core of the sink is kept generic and free of any connector specific dependencies. The sink is designed to participate in checkpointing to provide at-least once semantics, but it is limited to destinations that provide a client that supports async requests. h2. References More details to be found https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink -- This message was sent by Atlassian Jira (v8.3.4#803005)