[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668116#comment-16668116 ] ASF GitHub Bot commented on FLINK-9083: --- jparkie commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#issuecomment-434168230 I rebased my PR, and I'll be working on the new changes to address the PR comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jparkie commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
jparkie commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#issuecomment-434168230 I rebased my PR, and I'll be working on the new changes to address the PR comments. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668098#comment-16668098 ] ASF GitHub Bot commented on FLINK-9083: --- jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229170161 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -127,35 +144,112 @@ public void close() throws Exception { } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } + public void initializeState(FunctionInitializationContext context) throws Exception {} @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { checkAsyncErrors(); - waitForPendingUpdates(); + flush(); checkAsyncErrors(); } - private void waitForPendingUpdates() throws InterruptedException { - synchronized (updatesPending) { - while (updatesPending.get() > 0) { - updatesPending.wait(); + @Override + public void invoke(IN value) throws Exception { + checkAsyncErrors(); + tryAcquire(); + final ListenableFuture result = send(value); + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(V ignored) { + release(); } + + @Override + public void onFailure(Throwable currentError) { + setAsyncErrors(currentError); + release(); + } + }); + } + + // --- User-Defined Sink Methods -- + + public abstract ListenableFuture send(IN value); + + // - Configuration Methods + + /** +* Sets the maximum allowed number of concurrent requests for this sink. +* +* @param maxConcurrentRequests maximum number of concurrent requests allowed +* @param timeout timeout duration when acquiring a permit to execute +* @param unit timeout unit when acquiring a permit to execute +*/ + public void setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit) { + Preconditions.checkArgument(maxConcurrentRequests >= 0, "maxConcurrentRequests cannot be negative."); + Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative."); + this.maxConcurrentRequests = maxConcurrentRequests; + this.maxConcurrentRequestsTimeout = timeout; + this.maxConcurrentRequestsTimeoutUnit = unit; + } + + // --- Cassandra Methods -- + + protected Cluster createCluster() { Review comment: The `CassandraPojoSink` relies on it. - https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java#L92 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the
[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229170161 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -127,35 +144,112 @@ public void close() throws Exception { } @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - } + public void initializeState(FunctionInitializationContext context) throws Exception {} @Override - public void snapshotState(FunctionSnapshotContext ctx) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { checkAsyncErrors(); - waitForPendingUpdates(); + flush(); checkAsyncErrors(); } - private void waitForPendingUpdates() throws InterruptedException { - synchronized (updatesPending) { - while (updatesPending.get() > 0) { - updatesPending.wait(); + @Override + public void invoke(IN value) throws Exception { + checkAsyncErrors(); + tryAcquire(); + final ListenableFuture result = send(value); + Futures.addCallback(result, new FutureCallback() { + @Override + public void onSuccess(V ignored) { + release(); } + + @Override + public void onFailure(Throwable currentError) { + setAsyncErrors(currentError); + release(); + } + }); + } + + // --- User-Defined Sink Methods -- + + public abstract ListenableFuture send(IN value); + + // - Configuration Methods + + /** +* Sets the maximum allowed number of concurrent requests for this sink. +* +* @param maxConcurrentRequests maximum number of concurrent requests allowed +* @param timeout timeout duration when acquiring a permit to execute +* @param unit timeout unit when acquiring a permit to execute +*/ + public void setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit) { + Preconditions.checkArgument(maxConcurrentRequests >= 0, "maxConcurrentRequests cannot be negative."); + Preconditions.checkArgument(timeout >= 0, "timeout cannot be negative."); + this.maxConcurrentRequests = maxConcurrentRequests; + this.maxConcurrentRequestsTimeout = timeout; + this.maxConcurrentRequestsTimeoutUnit = unit; + } + + // --- Cassandra Methods -- + + protected Cluster createCluster() { Review comment: The `CassandraPojoSink` relies on it. - https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java#L92 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10231) Add a view SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668093#comment-16668093 ] winifredtang commented on FLINK-10231: -- [~fhueske] Hello, could you give more details about it? I try to achieve a view SQL DDL. I wonder if it would better for me to use the TableEnvironment.registerTableInternal(name: String, table: AbstractTable) instead of the TableEnvironment.registerTable(name: String, table: Table). Thanks a lot. > Add a view SQL DDL > -- > > Key: FLINK-10231 > URL: https://issues.apache.org/jira/browse/FLINK-10231 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Priority: Major > > FLINK-10163 added initial view support for the SQL Client. However, for > supporting the [full definition of > views|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterView] > (with schema, comments, etc.) we need to support native support for views in > the Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668091#comment-16668091 ] ASF GitHub Bot commented on FLINK-9083: --- jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229169780 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; Review comment: I'm seeing quite a few of existing connectors relying on `Map` - https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L204 - https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java#L87 Do you recommend following the same scheme? I can then use `ParameterTool` to enforce parameter types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10231) Add a view SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-10231: Assignee: winifredtang > Add a view SQL DDL > -- > > Key: FLINK-10231 > URL: https://issues.apache.org/jira/browse/FLINK-10231 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: winifredtang >Priority: Major > > FLINK-10163 added initial view support for the SQL Client. However, for > supporting the [full definition of > views|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterView] > (with schema, comments, etc.) we need to support native support for views in > the Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229169780 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; Review comment: I'm seeing quite a few of existing connectors relying on `Map` - https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L204 - https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java#L87 Do you recommend following the same scheme? I can then use `ParameterTool` to enforce parameter types. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668081#comment-16668081 ] ASF GitHub Bot commented on FLINK-9083: --- jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229168212 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: That's true; a concurrent set of `Future`s would be simpler. I've seen implementations of similar code in other OSS that rely on a concurrent `TrieMap` to track all the `pendingFutures` before a snapshot. At the snapshot, it iterates the `pendingFutures` calling `get()` to block which either returns or throws an exception.
[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229168212 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; + + // --- Cassandra Fields --- private final ClusterBuilder builder; - private final AtomicInteger updatesPending = new AtomicInteger(); + protected transient Cluster cluster; + protected transient Session session; + + // Synchronization Fields + + private AtomicReference throwable; + private Semaphore semaphore; + private Phaser phaser; CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); } - @Override - public void open(Configuration configuration) { - this.callback = new FutureCallback() { - @Override - public void onSuccess(V ignored) { - int pending = updatesPending.decrementAndGet(); - if (pending == 0) { - synchronized (updatesPending) { - updatesPending.notifyAll(); - } - } - } + // - Sink Methods - + @Override + public void open(Configuration parameters) { + cluster = createCluster(); + session = createSession(); + + throwable = new AtomicReference<>(); + semaphore = new Semaphore(maxConcurrentRequests); + /* +* A Phaser is a flexible and reusable synchronization barrier similar to CyclicBarrier and CountDownLatch. +* +* This Phaser is configured to support "1 + N" parties. +* - "1" for the CassandraSinkBase to arrive and to await at the Phaser during a flush() call. +* - "N" for the varying number of invoke() calls that register and de-register with the Phaser. +* +* The Phaser awaits the completion of the advancement of a phase prior to returning from a register() call. +* This behavior ensures that no backlogged invoke() calls register to execute while the Semaphore's permits +* are being released during a flush() call. +*/ + phaser = new Phaser(1) { Review comment: That's true; a concurrent set of `Future`s would be simpler. I've seen implementations of similar code in other OSS that rely on a concurrent `TrieMap` to track all the `pendingFutures` before a snapshot. At the snapshot, it iterates the `pendingFutures` calling `get()` to block which either returns or throws an exception. However, the `Future`s are created in large batches with Spark, so this implementation may have caveats in the streaming nature of Flink. In Java, if we use a set backed by a `ConcurrentHashMap`, since we rely on a semaphore to
[jira] [Assigned] (FLINK-9142) Lower the minimum number of buffers for incoming channels to 1
[ https://issues.apache.org/jira/browse/FLINK-9142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] blues zheng reassigned FLINK-9142: -- Assignee: blues zheng > Lower the minimum number of buffers for incoming channels to 1 > -- > > Key: FLINK-9142 > URL: https://issues.apache.org/jira/browse/FLINK-9142 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: blues zheng >Priority: Major > Fix For: 1.6.3, 1.7.0 > > > Even if we make the floating buffers optional, we still require > {{taskmanager.network.memory.buffers-per-channel}} number of (exclusive) > buffers per incoming channel with credit-based flow control while without, > the minimum was 1 and only the maximum number of buffers was influenced by > this parameter. > {{taskmanager.network.memory.buffers-per-channel}} is set to {{2}} by default > with the argumentation that this way we will have one buffer available for > netty to process while a worker thread is processing/deserializing the other > buffer. While this seems reasonable, it does increase our minimum > requirements. Instead, we could probably live with {{1}} exclusive buffer and > up to {{gate.getNumberOfInputChannels() * (networkBuffersPerChannel - 1) + > extraNetworkBuffersPerGate}} floating buffers. That way we will have the same > memory footprint as before with only slightly changed behaviour. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668058#comment-16668058 ] ASF GitHub Bot commented on FLINK-9083: --- jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229163101 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; Review comment: Yeah. I thought the constructor would explode too much. Furthermore, I was hesitant to introduce a new constructor that conflicted with the existing if anyone relied on the package public one. However, I like your idea of a config class. I can make one constructor accept that and have all the old constructors with a `@deprecated` annotation that just delegates to the new one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229163101 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ## @@ -43,70 +48,82 @@ */ public abstract class CassandraSinkBase extends RichSinkFunction implements CheckpointedFunction { protected final Logger log = LoggerFactory.getLogger(getClass()); - protected transient Cluster cluster; - protected transient Session session; - protected transient volatile Throwable exception; - protected transient FutureCallback callback; + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; + private long maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private TimeUnit maxConcurrentRequestsTimeoutUnit = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT; Review comment: Yeah. I thought the constructor would explode too much. Furthermore, I was hesitant to introduce a new constructor that conflicted with the existing if anyone relied on the package public one. However, I like your idea of a config class. I can make one constructor accept that and have all the old constructors with a `@deprecated` annotation that just delegates to the new one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector
[ https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668054#comment-16668054 ] ASF GitHub Bot commented on FLINK-9083: --- jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229162559 ## File path: docs/dev/connectors/cassandra.md ## @@ -72,10 +72,13 @@ The following configuration methods can be used: 4. _setMapperOptions(MapperOptions options)_ * Sets the mapper options that are used to configure the DataStax ObjectMapper. * Only applies when processing __POJO__ data types. -5. _enableWriteAheadLog([CheckpointCommitter committer])_ +5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit)_ +* Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute. +* Only applies when __enableWriteAheadLog()__ is not configured. Review comment: Yeah. It was just implementation effort. I was thinking of a follow up ticket to implement it and verify it, since the WAL version is always the tricky one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add async backpressure support to Cassandra Connector > - > > Key: FLINK-9083 > URL: https://issues.apache.org/jira/browse/FLINK-9083 > Project: Flink > Issue Type: Improvement > Components: Cassandra Connector >Reporter: Jacob Park >Assignee: Jacob Park >Priority: Minor > Labels: pull-request-available > > As the CassandraSinkBase derivatives utilize async writes, they do not block > the task to introduce any backpressure. > I am currently using a semaphore to provide backpressure support by blocking > at a maximum concurrent requests limit like how DataStax's Spark Cassandra > Connector functions: > [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18] > This improvement has greatly improved the fault-tolerance of our Cassandra > Sink Connector implementation on Apache Flink in production. I would like to > contribute this feature back upstream. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r229162559 ## File path: docs/dev/connectors/cassandra.md ## @@ -72,10 +72,13 @@ The following configuration methods can be used: 4. _setMapperOptions(MapperOptions options)_ * Sets the mapper options that are used to configure the DataStax ObjectMapper. * Only applies when processing __POJO__ data types. -5. _enableWriteAheadLog([CheckpointCommitter committer])_ +5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit unit)_ +* Sets the maximum allowed number of concurrent requests with a timeout for acquiring permits to execute. +* Only applies when __enableWriteAheadLog()__ is not configured. Review comment: Yeah. It was just implementation effort. I was thinking of a follow up ticket to implement it and verify it, since the WAL version is always the tricky one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors
[ https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668028#comment-16668028 ] ASF GitHub Bot commented on FLINK-10600: yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#discussion_r229159258 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -538,6 +550,11 @@ under the License. shade + + + org.apache.flink:flink-connector-kafka_${scala.binary.version} Review comment: yes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide End-to-end test cases for modern Kafka connectors > - > > Key: FLINK-10600 > URL: https://issues.apache.org/jira/browse/FLINK-10600 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#discussion_r229159258 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -538,6 +550,11 @@ under the License. shade + + + org.apache.flink:flink-connector-kafka_${scala.binary.version} Review comment: yes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure
[ https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668025#comment-16668025 ] vinoyang commented on FLINK-10704: -- [~pnowojski] [~twalthr] Yes, I am just curious about why we ruled it out. In fact, Timo's idea is right. It is not good to check "error" in the shell (and ignore case). I will modify it slightly to have it check for "ERROR" (log level) and not ignore case. Later, we found that some specific log keywords indicate that it caused a test error, and we added to this list. > Fix sql client end to end test failure > -- > > Key: FLINK-10704 > URL: https://issues.apache.org/jira/browse/FLINK-10704 > Project: Flink > Issue Type: Bug > Components: E2E Tests, Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The log file contains the following sentence: > {code:java} > 2018-10-29 03:27:39,209 WARN > org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser > - Error while loading kafka-version.properties :null > {code} > The reason for this log is that we explicitly exclude the version description > file of the kafka client when packaging the connector: > {code:java} > > > *:* > > kafka/kafka-version.properties > > > {code} > When the shell scan the "error" keyword with grep, it will hit, so the test > will fail. > {code:java} > function check_logs_for_errors { > error_count=$(grep -rv "GroupCoordinatorNotAvailableException" > $FLINK_DIR/log \ > | grep -v "RetriableCommitFailedException" \ > | grep -v "NoAvailableBrokersException" \ > | grep -v "Async Kafka commit failed" \ > | grep -v "DisconnectException" \ > | grep -v "AskTimeoutException" \ > | grep -v "WARN akka.remote.transport.netty.NettyTransport" \ > | grep -v "WARN > org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \ > | grep -v "jvm-exit-on-fatal-error" \ > | grep -v '^INFO:.*AWSErrorCode=\[400 Bad > Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]' > \ > | grep -v "RejectedExecutionException" \ > | grep -v "An exception was thrown by an exception handler" \ > | grep -v "java.lang.NoClassDefFoundError: > org/apache/hadoop/yarn/exceptions/YarnException" \ > | grep -v "java.lang.NoClassDefFoundError: > org/apache/hadoop/conf/Configuration" \ > | grep -v > "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector > - Error when creating PropertyDescriptor for public final void > org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)! > Ignoring this property." \ > | grep -ic "error")//here > if [[ ${error_count} -gt 0 ]]; then > echo "Found error in log files:" > cat $FLINK_DIR/log/* > EXIT_CODE=1 > fi > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10698) Create CatalogManager class manages all external catalogs and temporary meta objects
[ https://issues.apache.org/jira/browse/FLINK-10698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] winifredtang reassigned FLINK-10698: Assignee: winifredtang > Create CatalogManager class manages all external catalogs and temporary meta > objects > > > Key: FLINK-10698 > URL: https://issues.apache.org/jira/browse/FLINK-10698 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.1 >Reporter: Xuefu Zhang >Assignee: winifredtang >Priority: Major > > Currently {{TableEnvironment}} manages a list of registered external catalogs > as well as in-memory meta objects, and interacts with Calcite schema. It > would be cleaner to delegate all those responsibilities to a dedicate class, > especially when Flink's meta objects are also stored in a catalog. > {{CatalogManager}} is responsible to manage all meta objects, including > external catalogs, temporary meta objects, and Calcite schema. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668016#comment-16668016 ] ASF GitHub Bot commented on FLINK-9697: --- yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725 @pnowojski Not only the modern kafka connector, but also other connectors. Each version of the flink-connector relies on the base module, while the base module explicitly relies on kafka. So, here we need to exclude it from the base module and explicitly introduce the high version dependency of kafka. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668015#comment-16668015 ] ASF GitHub Bot commented on FLINK-9697: --- yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725 @pnowojski Not only the modern kafka connector, but also other connectors. Each version of the flink-connector relies on the base module, while the base module shows dependencies on kafka. So, here we need to exclude it from the base module and explicitly introduce the high version dependency of kafka. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka
yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725 @pnowojski Not only the modern kafka connector, but also other connectors. Each version of the flink-connector relies on the base module, while the base module explicitly relies on kafka. So, here we need to exclude it from the base module and explicitly introduce the high version dependency of kafka. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka
yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725 @pnowojski Not only the modern kafka connector, but also other connectors. Each version of the flink-connector relies on the base module, while the base module shows dependencies on kafka. So, here we need to exclude it from the base module and explicitly introduce the high version dependency of kafka. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
[ https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668009#comment-16668009 ] Xiaogang Shi commented on FLINK-10714: -- [~cmick] I came across a similar problem before. It seems that kryo cannot properly serialize some collection types. Finally i got rid of this problem by registering another serializer (e.g. JavaSerializer) for problematic collection types in ExecutionConfig. > java.lang.IndexOutOfBoundsException when creating a heap backend snapshot > - > > Key: FLINK-10714 > URL: https://issues.apache.org/jira/browse/FLINK-10714 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.5, 1.6.2 > Environment: Flink 1.6.2, FsStateBackend >Reporter: Michał Ciesielczyk >Priority: Blocker > Fix For: 1.7.0 > > > I'm sometimes getting error while creating a checkpointing using a filesystem > state backend. This ONLY happens when asynchronous snapshots are enabled > using the FileSystem State Backend. When RocksDB is enabled everything works > fine. > > I'm using a simple KeyedStream,mapWithState function with a ValueState > holding a hashmap (scala.collection.immutable.Map). It's hard to reproduce > the error using a simple code snippet, as the error occurs randomly. > > This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), > but I'm still experiencing such behavior. > > Stacktrace: > > {code:java} > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172] > at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172] > at > com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) > ~[kryo-shaded-4.0.0.jar:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231) > ~[flink-core-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > [flink-runtime_2.11-1.6.1.jar:1.6.1] > at
[jira] [Assigned] (FLINK-10718) Use IO executor in RpcService for message serialization
[ https://issues.apache.org/jira/browse/FLINK-10718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-10718: Assignee: vinoyang > Use IO executor in RpcService for message serialization > --- > > Key: FLINK-10718 > URL: https://issues.apache.org/jira/browse/FLINK-10718 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > The {{AkkaInvocationHandler}} and once FLINK-10251 has been merged also the > {{AkkaRpcActor}} serialize their remote RPC messages before sending them. > This happens in the calling thread which can be a main thread of another > {{RpcEndpoint}}. Depending on the de-/serialization time, this can be > considered a blocking operation. Thus, I propose to introduce an IO executor > which is being used for the serialization of the messages. This will make the > {{RpcService}} abstraction more scalable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kbohinski opened a new pull request #6962: change accumulator value to be formatted with digit group separator
kbohinski opened a new pull request #6962: change accumulator value to be formatted with digit group separator URL: https://github.com/apache/flink/pull/6962 ## What is the purpose of the change *This pull request is a small qol change that makes it easier for users to read the values of their accumulator(s).* ## Brief change log - *Filter the value of the accumulator with toLocaleString* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) No - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) No - The serializers: (yes / no / don't know) No - The runtime per-record code paths (performance sensitive): (yes / no / don't know) No - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) No - The S3 file system connector: (yes / no / don't know) No ## Documentation - Does this pull request introduce a new feature? (yes / no) No - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free
[ https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667841#comment-16667841 ] ASF GitHub Bot commented on FLINK-10687: bowenli86 edited a comment on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016 I only had a detailed look at classes in `flink-format/flink-avro` and `flink-table-common`. Big PR while changes are mostly renaming and migrating without introducing new feature. Look very good to me! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make flink-formats Scala-free > - > > Key: FLINK-10687 > URL: https://issues.apache.org/jira/browse/FLINK-10687 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > {{flink-table}} is the only dependency that pulls in Scala for > {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} > Scala-free using only a dependency to {{flink-table-common}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bowenli86 edited a comment on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free
bowenli86 edited a comment on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016 I only had a detailed look at classes in `flink-format/flink-avro` and `flink-table-common`. Big PR while changes are mostly renaming and migrating without introducing new feature. Look very good to me! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free
[ https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667839#comment-16667839 ] ASF GitHub Bot commented on FLINK-10687: bowenli86 commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016 I only had a detailed look at classes in `flink-table-common`. Big PR while changes are mostly renaming and migrating without introducing new feature. Look very good to me! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make flink-formats Scala-free > - > > Key: FLINK-10687 > URL: https://issues.apache.org/jira/browse/FLINK-10687 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > {{flink-table}} is the only dependency that pulls in Scala for > {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} > Scala-free using only a dependency to {{flink-table-common}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bowenli86 commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free
bowenli86 commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016 I only had a detailed look at classes in `flink-table-common`. Big PR while changes are mostly renaming and migrating without introducing new feature. Look very good to me! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test
[ https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667809#comment-16667809 ] Chesnay Schepler commented on FLINK-10720: -- Could you check whether the {{flink-high-parallelism-iterations-test}} fulfills this ticket? > Add stress deployment end-to-end test > - > > Key: FLINK-10720 > URL: https://issues.apache.org/jira/browse/FLINK-10720 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > In order to test Flink's scalability, I suggest to add an end-to-end test > which tests the deployment of a job which is very demanding. The job should > have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or > having a high degree of parallelism). That way we can test that the > serialization overhead of the TDDs does not affect the health of the cluster > (e.g. heartbeats are not affected because the serialization does not happen > in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10720) Add stress deployment end-to-end test
Till Rohrmann created FLINK-10720: - Summary: Add stress deployment end-to-end test Key: FLINK-10720 URL: https://issues.apache.org/jira/browse/FLINK-10720 Project: Flink Issue Type: Sub-task Components: E2E Tests Affects Versions: 1.7.0 Reporter: Till Rohrmann Fix For: 1.7.0 In order to test Flink's scalability, I suggest to add an end-to-end test which tests the deployment of a job which is very demanding. The job should have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or having a high degree of parallelism). That way we can test that the serialization overhead of the TDDs does not affect the health of the cluster (e.g. heartbeats are not affected because the serialization does not happen in the main thread). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10686) Introduce a flink-table-common module
[ https://issues.apache.org/jira/browse/FLINK-10686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667642#comment-16667642 ] Bowen Li commented on FLINK-10686: -- Hi [~twalthr], I just found in our mail list that 1.7 release was targeted at end of Oct/beginning of Nov. Do you think we can make this task into 1.7? > Introduce a flink-table-common module > - > > Key: FLINK-10686 > URL: https://issues.apache.org/jira/browse/FLINK-10686 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Fix For: 1.7.0 > > > Because more and more table factories for connectors and formats are added > and external catalog support is also on the horizon, {{flink-table}} becomes > a dependency for many Flink modules. Since {{flink-table}} is implemented in > Scala it requires other modules to be suffixes with Scala prefixes. However, > as we have learned in the past, Scala code is hard to maintain which is why > our long-term goal is to avoid Scala/Scala dependencies. > Therefore we propose a new module {{flink-table-common}} that contains > interfaces between {{flink-table}} and other modules. This module is > implemented in Java and should contain minimal (or better no) external > dependencies. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10719) Add user documentation for Flink-Hive integration
Bowen Li created FLINK-10719: Summary: Add user documentation for Flink-Hive integration Key: FLINK-10719 URL: https://issues.apache.org/jira/browse/FLINK-10719 Project: Flink Issue Type: Sub-task Components: Documentation, SQL Client, Table API SQL Reporter: Bowen Li -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10697) Create an in-memory catalog that stores Flink's meta objects
[ https://issues.apache.org/jira/browse/FLINK-10697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-10697: Assignee: Bowen Li > Create an in-memory catalog that stores Flink's meta objects > > > Key: FLINK-10697 > URL: https://issues.apache.org/jira/browse/FLINK-10697 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.1 >Reporter: Xuefu Zhang >Assignee: Bowen Li >Priority: Major > > Currently all Flink meta objects (currently tables only) are stored in memory > as part of Calcite catalog. Those objects are temporary (such as inline > tables), others are meant to live beyond user session. As we introduce > catalog for those objects (tables, views, and UDFs), it makes sense to > organize them neatly. Further, having a catalog implementation that store > those objects in memory is to retain the currently behavior, which can be > configured by user. > Please note that this implementation is different from the current > {{InMemoryExternalCatalog}, which is used mainly for testing and doesn't > reflect what's actually needed for Flink meta objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10696) Add APIs to ExternalCatalog for views and UDFs
[ https://issues.apache.org/jira/browse/FLINK-10696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-10696: Assignee: Bowen Li > Add APIs to ExternalCatalog for views and UDFs > -- > > Key: FLINK-10696 > URL: https://issues.apache.org/jira/browse/FLINK-10696 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.6.1 >Reporter: Xuefu Zhang >Assignee: Bowen Li >Priority: Major > > Currently there are APIs for tables only. However, views and UDFs are also > common objects in a catalog. > This is required when we store Flink tables/views/UDFs in an external > persistent storage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10686) Introduce a flink-table-common module
[ https://issues.apache.org/jira/browse/FLINK-10686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667608#comment-16667608 ] Bowen Li commented on FLINK-10686: -- I'd vote for this, and hope it can be done ASAP so all the coming Flink-Hive integration work can depend on Java only. > Introduce a flink-table-common module > - > > Key: FLINK-10686 > URL: https://issues.apache.org/jira/browse/FLINK-10686 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Affects Versions: 1.7.0 >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Fix For: 1.7.0 > > > Because more and more table factories for connectors and formats are added > and external catalog support is also on the horizon, {{flink-table}} becomes > a dependency for many Flink modules. Since {{flink-table}} is implemented in > Scala it requires other modules to be suffixes with Scala prefixes. However, > as we have learned in the past, Scala code is hard to maintain which is why > our long-term goal is to avoid Scala/Scala dependencies. > Therefore we propose a new module {{flink-table-common}} that contains > interfaces between {{flink-table}} and other modules. This module is > implemented in Java and should contain minimal (or better no) external > dependencies. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10556) Integration with Apache Hive
[ https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667607#comment-16667607 ] Bowen Li commented on FLINK-10556: -- FLINK-10686 will create a Java-based flink-table-common module that Flink-Hive integration may depend on. Once FLINK-10686 is done, the work of Flink-Hive integration may be written completely in Java. background: the community has decided to move away from Scala as we've learned Scala is hard to maintain. > Integration with Apache Hive > > > Key: FLINK-10556 > URL: https://issues.apache.org/jira/browse/FLINK-10556 > Project: Flink > Issue Type: New Feature > Components: Batch Connectors and Input/Output Formats, SQL Client, > Table API SQL >Affects Versions: 1.6.0 >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf > > > This is an umbrella JIRA tracking all enhancement and issues related to > integrating Flink with Hive ecosystem. This is an outcome of a discussion in > the community, and thanks go to everyone that provided feedback and interest. > Specifically, we'd like to see the following features and capabilities > immediately in Flink: > # Metadata interoperability > # Data interoperability > # Data type compatibility > # Hive UDF support > # DDL/DML/Query language compatibility > For a longer term, we'd also like to add or improve: > # Compatible SQL service, client tools, JDBC/ODBC drivers > # Better task failure tolerance and task scheduling > # Support other user customizations in Hive (storage handlers, serdes, etc). > I will provide more details regarding the proposal in a doc shortly. Design > doc, if deemed necessary, will be provided in each related sub tasks under > this JIRA. > Feedback and contributions are greatly welcome! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667569#comment-16667569 ] Ying Xu commented on FLINK-4582: Thanks [~tinder-dthomson] for the detailed comments. Yes that's exactly why I felt _efficient multi-stream_ support is somehow lacking :). Actually, we are running Flink 1.5.2 internally. For contributing to upstream, I'm currently adapting the patch to fit the master flink (1.7-SNAPSNOT). The main difference is flink 1.7 Kinesis connector uses the _listshards API_ to retrieve the shard list. For DynamoDB streams, we must use the _describeStreams API_ to retrieve such information since listshards is not supported. I am currently porting related logic around _describeStreams_ from the 1.5 flink to my patch. I shall be able to post a meaningful PR in 1-2 days. > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai >Assignee: Ying Xu >Priority: Major > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) Add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667474#comment-16667474 ] Jonathan Miles commented on FLINK-9061: --- Agreed it should be harder, but we were able to trigger throttling while testing 1.4.2 around mid-August, after that announcement was made. We had around 10 jobs checkpointing to the same bucket and a different prefix for each job, something like 400 Task Managers. I know there was some work done to reduce the number of S3 requests made and combined with this "prefix entropy" change we haven't seen it happen again. It might be useful to add [your link|https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/] to the documentation. > Add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Improvement > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2, 1.5.0 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9061) Add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667437#comment-16667437 ] Elias Levy commented on FLINK-9061: --- Just a note to say that changes AWS made to S3 in July means that it is a lot more difficult to hit S3 performance limits that would require this feature, as now S3 can do up to 3.5K concurrent writes. See [https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/]. > Add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Improvement > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2, 1.5.0 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
[ https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-10714: --- Fix Version/s: 1.7.0 > java.lang.IndexOutOfBoundsException when creating a heap backend snapshot > - > > Key: FLINK-10714 > URL: https://issues.apache.org/jira/browse/FLINK-10714 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.5, 1.6.2 > Environment: Flink 1.6.2, FsStateBackend >Reporter: Michał Ciesielczyk >Priority: Blocker > Fix For: 1.7.0 > > > I'm sometimes getting error while creating a checkpointing using a filesystem > state backend. This ONLY happens when asynchronous snapshots are enabled > using the FileSystem State Backend. When RocksDB is enabled everything works > fine. > > I'm using a simple KeyedStream,mapWithState function with a ValueState > holding a hashmap (scala.collection.immutable.Map). It's hard to reproduce > the error using a simple code snippet, as the error occurs randomly. > > This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), > but I'm still experiencing such behavior. > > Stacktrace: > > {code:java} > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172] > at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172] > at > com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) > ~[kryo-shaded-4.0.0.jar:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231) > ~[flink-core-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > [flink-runtime_2.11-1.6.1.jar:1.6.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172] > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
[ https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter updated FLINK-10714: --- Priority: Blocker (was: Major) > java.lang.IndexOutOfBoundsException when creating a heap backend snapshot > - > > Key: FLINK-10714 > URL: https://issues.apache.org/jira/browse/FLINK-10714 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.5, 1.6.2 > Environment: Flink 1.6.2, FsStateBackend >Reporter: Michał Ciesielczyk >Priority: Blocker > Fix For: 1.7.0 > > > I'm sometimes getting error while creating a checkpointing using a filesystem > state backend. This ONLY happens when asynchronous snapshots are enabled > using the FileSystem State Backend. When RocksDB is enabled everything works > fine. > > I'm using a simple KeyedStream,mapWithState function with a ValueState > holding a hashmap (scala.collection.immutable.Map). It's hard to reproduce > the error using a simple code snippet, as the error occurs randomly. > > This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), > but I'm still experiencing such behavior. > > Stacktrace: > > {code:java} > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172] > at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172] > at > com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) > ~[kryo-shaded-4.0.0.jar:?] > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) > ~[kryo-shaded-4.0.0.jar:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231) > ~[flink-core-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) > ~[scala-library-2.11.12.jar:?] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > ~[flink-scala_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73) > ~[flink-runtime_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > [flink-runtime_2.11-1.6.1.jar:1.6.1] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172] > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10691) StreamSQL E2E test relies on hadoop
[ https://issues.apache.org/jira/browse/FLINK-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667370#comment-16667370 ] Hequn Cheng commented on FLINK-10691: - Would it be ok if we replace {{BucketingSink}} with {{StreamingFileSink}}? It doesn't require hadoop and there is already a BucketSink E2E test so we don't have to test Bucket sink in StreamSQL E2E test. > StreamSQL E2E test relies on hadoop > --- > > Key: FLINK-10691 > URL: https://issues.apache.org/jira/browse/FLINK-10691 > Project: Flink > Issue Type: Improvement > Components: E2E Tests, Table API SQL >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.7.0 > > > The {{flink-stream-sql-test}} makes use of the {{BucketingSink}} and thus > requires hadoop to be available. This means wecan't run this test in > hadoop-free environments, which is a bit unfortunate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free
[ https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667367#comment-16667367 ] ASF GitHub Bot commented on FLINK-10687: twalthr commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#issuecomment-433960589 Thanks for the review @aljoscha. I added more tests and improvements to the type parser. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make flink-formats Scala-free > - > > Key: FLINK-10687 > URL: https://issues.apache.org/jira/browse/FLINK-10687 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > {{flink-table}} is the only dependency that pulls in Scala for > {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} > Scala-free using only a dependency to {{flink-table-common}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free
twalthr commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#issuecomment-433960589 Thanks for the review @aljoscha. I added more tests and improvements to the type parser. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9635) Local recovery scheduling can cause spread out of tasks
[ https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9635: -- Labels: pull-request-available (was: ) > Local recovery scheduling can cause spread out of tasks > --- > > Key: FLINK-9635 > URL: https://issues.apache.org/jira/browse/FLINK-9635 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.2 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to make local recovery work, Flink's scheduling was changed such > that it tries to be rescheduled to its previous location. In order to not > occupy slots which have state of other tasks cached, the strategy will > request a new slot if the old slot identified by the previous allocation id > is no longer present. This also applies to newly allocated slots because > there is no distinction between new or already used. This behaviour can cause > that every tasks gets deployed to its own slot if the {{SlotPool}} has > released all slots in the meantime, for example. The consequence could be > that a job can no longer be executed after a failure because it needs more > slots than before. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks
[ https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667366#comment-16667366 ] ASF GitHub Bot commented on FLINK-9635: --- StefanRRichter opened a new pull request #6961: [FLINK-9635] Fix scheduling for local recovery URL: https://github.com/apache/flink/pull/6961 ## What is the purpose of the change This change fixes the task spread-out problem in scheduling with local recovery. The solution is based on creating a global set of all previous allocation ids as blacklist to avoid, but all other allocation ids are now free to take again. ## Brief change log This PR contains a subset of the changes from #6898 and focuses almost purely on the fix of local recovery. ## Verifying this change Improved `SchedulingITCase` should work now, including previously ignored tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Local recovery scheduling can cause spread out of tasks > --- > > Key: FLINK-9635 > URL: https://issues.apache.org/jira/browse/FLINK-9635 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.2 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > In order to make local recovery work, Flink's scheduling was changed such > that it tries to be rescheduled to its previous location. In order to not > occupy slots which have state of other tasks cached, the strategy will > request a new slot if the old slot identified by the previous allocation id > is no longer present. This also applies to newly allocated slots because > there is no distinction between new or already used. This behaviour can cause > that every tasks gets deployed to its own slot if the {{SlotPool}} has > released all slots in the meantime, for example. The consequence could be > that a job can no longer be executed after a failure because it needs more > slots than before. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter opened a new pull request #6961: [FLINK-9635] Fix scheduling for local recovery
StefanRRichter opened a new pull request #6961: [FLINK-9635] Fix scheduling for local recovery URL: https://github.com/apache/flink/pull/6961 ## What is the purpose of the change This change fixes the task spread-out problem in scheduling with local recovery. The solution is based on creating a global set of all previous allocation ids as blacklist to avoid, but all other allocation ids are now free to take again. ## Brief change log This PR contains a subset of the changes from #6898 and focuses almost purely on the fix of local recovery. ## Verifying this change Improved `SchedulingITCase` should work now, including previously ignored tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10691) StreamSQL E2E test relies on hadoop
[ https://issues.apache.org/jira/browse/FLINK-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10691: --- Assignee: Hequn Cheng > StreamSQL E2E test relies on hadoop > --- > > Key: FLINK-10691 > URL: https://issues.apache.org/jira/browse/FLINK-10691 > Project: Flink > Issue Type: Improvement > Components: E2E Tests, Table API SQL >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: Hequn Cheng >Priority: Major > Fix For: 1.7.0 > > > The {{flink-stream-sql-test}} makes use of the {{BucketingSink}} and thus > requires hadoop to be available. This means wecan't run this test in > hadoop-free environments, which is a bit unfortunate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667357#comment-16667357 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228937420 ## File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java ## @@ -115,4 +115,12 @@ .defaultValue(1024) .withDescription("The minimum size of state data files. All state chunks smaller than that are stored" + " inline in the root checkpoint metadata file."); + + /** +* The thread numbers used to download files from DFS in RocksDBStateBackend. +*/ + public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM = ConfigOptions + .key("state.checkpoint.restore.thread.num") + .defaultValue(1) + .withDescription("The thread numbers used to download files from DFS in RocksDBStateBackend."); Review comment: also here: `The thread number` without `s`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667360#comment-16667360 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228970266 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, resotreThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, resotreThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int resotreThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + final ExecutorService executorService = Executors.newFixedThreadPool(resotreThreadNum); + List> futures = new LinkedList<>(); + List closeables = new LinkedList<>(); + + try { + closeables.add(() -> executorService.shutdownNow()); Review comment: I think the tasks should be first properly canceled and then `executorService` should shutdown. `executorService.shutdownNow` also calls `future.cancel` which is interrupting the task thread. At the moment `while (true)` in `copyStateDataHandleData` does not support explicit cancelation. I suggest to create an `running = AtomicBoolean(true)` in the beginning then `while (true)` can be `while(running.get())`. This will allow to cancel all tasks at once and shutdown `executorService` in one `closeable`: `closeables.add(() -> { running.set(false); executorService.shutdownNow()); };` List of `closeables` is not needed then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL:
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667354#comment-16667354 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228937223 ## File path: docs/_includes/generated/checkpointing_configuration.html ## @@ -32,6 +32,11 @@ false + +state.checkpoint.restore.thread.num +1 +The thread numbers used to download files from DFS in RocksDBStateBackend. Review comment: I think `The thread number` should be singular without `s`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667359#comment-16667359 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228970609 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, resotreThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, resotreThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int resotreThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + final ExecutorService executorService = Executors.newFixedThreadPool(resotreThreadNum); + List> futures = new LinkedList<>(); + List closeables = new LinkedList<>(); + + try { + closeables.add(() -> executorService.shutdownNow()); + closeableRegistry.registerCloseable(((LinkedList) closeables).getLast()); + + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + if (resotreThreadNum > 1) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle, closeableRegistry); + } catch (IOException e) { Review comment: the exception handling can go inside `copyStateDataHandleData`. `new Path(restoreInstancePath, stateHandleID.toString())` can be a variable to reduce the line length. I would create a list of `Runnable`'s: ``` Path path = new Path(restoreInstancePath, stateHandleID.toString()); runnables.add( ->
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667355#comment-16667355 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228976096 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for RocksDbStateDataTransfer. + */ +public class RocksDBStateDataTransferTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testThreadPoolExceptionRethrow() throws Exception { + IncrementalKeyedStateHandle stateHandle = mock(IncrementalKeyedStateHandle.class); + + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread restore."); + StreamStateHandle mockStateHandle = mock(StreamStateHandle.class); + when(mockStateHandle.openInputStream()).thenThrow(expectedException); + + Map sharedStateHandle = new HashMap<>(1); + sharedStateHandle.put(new StateHandleID("mock"), mockStateHandle); + when(stateHandle.getSharedState()).thenReturn(sharedStateHandle); + + try { + RocksDbStateDataTransfer.transferAllStateDataToDirectory(stateHandle, new Path(temporaryFolder.newFolder().toURI()), 5, new CloseableRegistry()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e.getCause().getCause()); + } + } + + /** +* Tests that download files with multi-thread correctly. +* @throws Exception +*/ + @Test + public void testMultiThreadRestoreCorrectly() throws Exception { + IncrementalKeyedStateHandle stateHandle = mock(IncrementalKeyedStateHandle.class); + + byte[] content1 = new byte[1]; Review comment: I would create a list of something like 6 test contexts and use loops over them for further actions. content length can be also random. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels:
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228970609 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, resotreThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, resotreThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int resotreThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + final ExecutorService executorService = Executors.newFixedThreadPool(resotreThreadNum); + List> futures = new LinkedList<>(); + List closeables = new LinkedList<>(); + + try { + closeables.add(() -> executorService.shutdownNow()); + closeableRegistry.registerCloseable(((LinkedList) closeables).getLast()); + + for (Map.Entry entry : stateHandleMap.entrySet()) { + StateHandleID stateHandleID = entry.getKey(); + StreamStateHandle remoteFileHandle = entry.getValue(); + + if (resotreThreadNum > 1) { + CompletableFuture future = CompletableFuture.runAsync(() -> { + try { + copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle, closeableRegistry); + } catch (IOException e) { Review comment: the exception handling can go inside `copyStateDataHandleData`. `new Path(restoreInstancePath, stateHandleID.toString())` can be a variable to reduce the line length. I would create a list of `Runnable`'s: ``` Path path = new Path(restoreInstancePath, stateHandleID.toString()); runnables.add( -> copyStateDataHandleData(path, remoteFileHandle, closeableRegistry)); ``` and then have separate code for `resotreThreadNum > 1` and `resotreThreadNum == 1`. `executorService` currently is not used for `resotreThreadNum == 1` but created.
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228970266 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + transferAllDataFromStateHandles(sstFiles, dest, resotreThreadNum, closeableRegistry); + transferAllDataFromStateHandles(miscFiles, dest, resotreThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void transferAllDataFromStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int resotreThreadNum, + CloseableRegistry closeableRegistry + ) throws Exception { + + final ExecutorService executorService = Executors.newFixedThreadPool(resotreThreadNum); + List> futures = new LinkedList<>(); + List closeables = new LinkedList<>(); + + try { + closeables.add(() -> executorService.shutdownNow()); Review comment: I think the tasks should be first properly canceled and then `executorService` should shutdown. `executorService.shutdownNow` also calls `future.cancel` which is interrupting the task thread. At the moment `while (true)` in `copyStateDataHandleData` does not support explicit cancelation. I suggest to create an `running = AtomicBoolean(true)` in the beginning then `while (true)` can be `while(running.get())`. This will allow to cancel all tasks at once and shutdown `executorService` in one `closeable`: `closeables.add(() -> { running.set(false); executorService.shutdownNow()); };` List of `closeables` is not needed then. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667356#comment-16667356 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228964472 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -235,9 +238,10 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend) { * @param checkpointStreamBackend The backend write the checkpoint streams to. * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. */ - public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { + public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing, int restoreThreadNum) { Review comment: I would also leave the previous method without restoreThreadNum: ``` public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { this(checkpointStreamBackend, enableIncrementalCheckpointing, -1); } ``` this way we do not break existing code, including other methods here, like this modification which is not needed then and other similar ones: ``` public RocksDBStateBackend(StateBackend checkpointStreamBackend) { this(checkpointStreamBackend, TernaryBoolean.UNDEFINED, 1); } ``` and I would rather keep it undefined here, e.g. `-1`, similar to `TernaryBoolean.UNDEFINED` for `enableIncrementalCheckpointing`. It can be resolved then in `private RocksDBStateBackend(RocksDBStateBackend original, Configuration config)` and `getRestoreThreadNum`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667358#comment-16667358 ] ASF GitHub Bot commented on FLINK-10461: azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228967531 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, Review comment: typo: `resotreThreadNum` -> `restoredThreadNum`, also in other methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228937223 ## File path: docs/_includes/generated/checkpointing_configuration.html ## @@ -32,6 +32,11 @@ false + +state.checkpoint.restore.thread.num +1 +The thread numbers used to download files from DFS in RocksDBStateBackend. Review comment: I think `The thread number` should be singular without `s`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228964472 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -235,9 +238,10 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend) { * @param checkpointStreamBackend The backend write the checkpoint streams to. * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. */ - public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { + public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing, int restoreThreadNum) { Review comment: I would also leave the previous method without restoreThreadNum: ``` public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) { this(checkpointStreamBackend, enableIncrementalCheckpointing, -1); } ``` this way we do not break existing code, including other methods here, like this modification which is not needed then and other similar ones: ``` public RocksDBStateBackend(StateBackend checkpointStreamBackend) { this(checkpointStreamBackend, TernaryBoolean.UNDEFINED, 1); } ``` and I would rather keep it undefined here, e.g. `-1`, similar to `TernaryBoolean.UNDEFINED` for `enableIncrementalCheckpointing`. It can be resolved then in `private RocksDBStateBackend(RocksDBStateBackend original, Configuration config)` and `getRestoreThreadNum`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228937420 ## File path: flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java ## @@ -115,4 +115,12 @@ .defaultValue(1024) .withDescription("The minimum size of state data files. All state chunks smaller than that are stored" + " inline in the root checkpoint metadata file."); + + /** +* The thread numbers used to download files from DFS in RocksDBStateBackend. +*/ + public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM = ConfigOptions + .key("state.checkpoint.restore.thread.num") + .defaultValue(1) + .withDescription("The thread numbers used to download files from DFS in RocksDBStateBackend."); Review comment: also here: `The thread number` without `s`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228967531 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +public class RocksDbStateDataTransfer { + + public static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int resotreThreadNum, Review comment: typo: `resotreThreadNum` -> `restoredThreadNum`, also in other methods This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r228976096 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java ## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for RocksDbStateDataTransfer. + */ +public class RocksDBStateDataTransferTest { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Test that the exception arose in the thread pool will rethrow to the main thread. +*/ + @Test + public void testThreadPoolExceptionRethrow() throws Exception { + IncrementalKeyedStateHandle stateHandle = mock(IncrementalKeyedStateHandle.class); + + SpecifiedException expectedException = new SpecifiedException("throw exception while multi thread restore."); + StreamStateHandle mockStateHandle = mock(StreamStateHandle.class); + when(mockStateHandle.openInputStream()).thenThrow(expectedException); + + Map sharedStateHandle = new HashMap<>(1); + sharedStateHandle.put(new StateHandleID("mock"), mockStateHandle); + when(stateHandle.getSharedState()).thenReturn(sharedStateHandle); + + try { + RocksDbStateDataTransfer.transferAllStateDataToDirectory(stateHandle, new Path(temporaryFolder.newFolder().toURI()), 5, new CloseableRegistry()); + fail(); + } catch (Exception e) { + assertEquals(expectedException, e.getCause().getCause()); + } + } + + /** +* Tests that download files with multi-thread correctly. +* @throws Exception +*/ + @Test + public void testMultiThreadRestoreCorrectly() throws Exception { + IncrementalKeyedStateHandle stateHandle = mock(IncrementalKeyedStateHandle.class); + + byte[] content1 = new byte[1]; Review comment: I would create a list of something like 6 test contexts and use loops over them for further actions. content length can be also random. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure
[ https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667345#comment-16667345 ] Timo Walther commented on FLINK-10704: -- I thought maybe Maven resource transformers could do it but apparently no. If there is no way of relocating it, I guess we need to add it to the SQL jar which requires an exclusion in the jar checking logic that we added to the SQL Client end-to-end test. Why are we actually checking for every "error" (case insensitive) string? Actually, this is only a warning. Shouldn't we check for "ERROR" instead? > Fix sql client end to end test failure > -- > > Key: FLINK-10704 > URL: https://issues.apache.org/jira/browse/FLINK-10704 > Project: Flink > Issue Type: Bug > Components: E2E Tests, Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The log file contains the following sentence: > {code:java} > 2018-10-29 03:27:39,209 WARN > org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser > - Error while loading kafka-version.properties :null > {code} > The reason for this log is that we explicitly exclude the version description > file of the kafka client when packaging the connector: > {code:java} > > > *:* > > kafka/kafka-version.properties > > > {code} > When the shell scan the "error" keyword with grep, it will hit, so the test > will fail. > {code:java} > function check_logs_for_errors { > error_count=$(grep -rv "GroupCoordinatorNotAvailableException" > $FLINK_DIR/log \ > | grep -v "RetriableCommitFailedException" \ > | grep -v "NoAvailableBrokersException" \ > | grep -v "Async Kafka commit failed" \ > | grep -v "DisconnectException" \ > | grep -v "AskTimeoutException" \ > | grep -v "WARN akka.remote.transport.netty.NettyTransport" \ > | grep -v "WARN > org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \ > | grep -v "jvm-exit-on-fatal-error" \ > | grep -v '^INFO:.*AWSErrorCode=\[400 Bad > Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]' > \ > | grep -v "RejectedExecutionException" \ > | grep -v "An exception was thrown by an exception handler" \ > | grep -v "java.lang.NoClassDefFoundError: > org/apache/hadoop/yarn/exceptions/YarnException" \ > | grep -v "java.lang.NoClassDefFoundError: > org/apache/hadoop/conf/Configuration" \ > | grep -v > "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector > - Error when creating PropertyDescriptor for public final void > org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)! > Ignoring this property." \ > | grep -ic "error")//here > if [[ ${error_count} -gt 0 ]]; then > echo "Found error in log files:" > cat $FLINK_DIR/log/* > EXIT_CODE=1 > fi > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka
[ https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667342#comment-16667342 ] ASF GitHub Bot commented on FLINK-9697: --- pnowojski commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-433950958 One more issue question. Why the new connector has this exclussion: ``` org.apache.flink flink-connector-kafka-base_${scala.binary.version} ${project.version} org.apache.kafka kafka_${scala.binary.version} ``` Shouldn't the `flink-connector-kafka-base` be Kafka independent? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide connector for modern Kafka > -- > > Key: FLINK-9697 > URL: https://issues.apache.org/jira/browse/FLINK-9697 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Kafka 2.0.0 would be released soon. > Here is vote thread: > [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1] > We should provide connector for Kafka 2.0.0 once it is released. > Upgrade to 2.0 documentation : > http://kafka.apache.org/20/documentation.html#upgrade_2_0_0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka
pnowojski commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka URL: https://github.com/apache/flink/pull/6703#issuecomment-433950958 One more issue question. Why the new connector has this exclussion: ``` org.apache.flink flink-connector-kafka-base_${scala.binary.version} ${project.version} org.apache.kafka kafka_${scala.binary.version} ``` Shouldn't the `flink-connector-kafka-base` be Kafka independent? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure
[ https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667332#comment-16667332 ] Piotr Nowojski commented on FLINK-10704: I asked [~twalthr] about that (he added it), and there is no particular reason, except of that it would probably collide in our connectors that pull in multiple Kafka versions ( which is an issue on it's own ). Maybe we can relocate the resource file and rename access patterns to it somehow? I don't know if that's possible. Quick google search didn't bring me answers. [~twalthr] do you have any ideas? > Fix sql client end to end test failure > -- > > Key: FLINK-10704 > URL: https://issues.apache.org/jira/browse/FLINK-10704 > Project: Flink > Issue Type: Bug > Components: E2E Tests, Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > > The log file contains the following sentence: > {code:java} > 2018-10-29 03:27:39,209 WARN > org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser > - Error while loading kafka-version.properties :null > {code} > The reason for this log is that we explicitly exclude the version description > file of the kafka client when packaging the connector: > {code:java} > > > *:* > > kafka/kafka-version.properties > > > {code} > When the shell scan the "error" keyword with grep, it will hit, so the test > will fail. > {code:java} > function check_logs_for_errors { > error_count=$(grep -rv "GroupCoordinatorNotAvailableException" > $FLINK_DIR/log \ > | grep -v "RetriableCommitFailedException" \ > | grep -v "NoAvailableBrokersException" \ > | grep -v "Async Kafka commit failed" \ > | grep -v "DisconnectException" \ > | grep -v "AskTimeoutException" \ > | grep -v "WARN akka.remote.transport.netty.NettyTransport" \ > | grep -v "WARN > org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \ > | grep -v "jvm-exit-on-fatal-error" \ > | grep -v '^INFO:.*AWSErrorCode=\[400 Bad > Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]' > \ > | grep -v "RejectedExecutionException" \ > | grep -v "An exception was thrown by an exception handler" \ > | grep -v "java.lang.NoClassDefFoundError: > org/apache/hadoop/yarn/exceptions/YarnException" \ > | grep -v "java.lang.NoClassDefFoundError: > org/apache/hadoop/conf/Configuration" \ > | grep -v > "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector > - Error when creating PropertyDescriptor for public final void > org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)! > Ignoring this property." \ > | grep -ic "error")//here > if [[ ${error_count} -gt 0 ]]; then > echo "Found error in log files:" > cat $FLINK_DIR/log/* > EXIT_CODE=1 > fi > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9635) Local recovery scheduling can cause spread out of tasks
[ https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9635: - Affects Version/s: 1.6.2 > Local recovery scheduling can cause spread out of tasks > --- > > Key: FLINK-9635 > URL: https://issues.apache.org/jira/browse/FLINK-9635 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.2 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.7.0 > > > In order to make local recovery work, Flink's scheduling was changed such > that it tries to be rescheduled to its previous location. In order to not > occupy slots which have state of other tasks cached, the strategy will > request a new slot if the old slot identified by the previous allocation id > is no longer present. This also applies to newly allocated slots because > there is no distinction between new or already used. This behaviour can cause > that every tasks gets deployed to its own slot if the {{SlotPool}} has > released all slots in the meantime, for example. The consequence could be > that a job can no longer be executed after a failure because it needs more > slots than before. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9635) Local recovery scheduling can cause spread out of tasks
[ https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-9635: Assignee: Stefan Richter > Local recovery scheduling can cause spread out of tasks > --- > > Key: FLINK-9635 > URL: https://issues.apache.org/jira/browse/FLINK-9635 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Stefan Richter >Priority: Critical > Fix For: 1.7.0 > > > In order to make local recovery work, Flink's scheduling was changed such > that it tries to be rescheduled to its previous location. In order to not > occupy slots which have state of other tasks cached, the strategy will > request a new slot if the old slot identified by the previous allocation id > is no longer present. This also applies to newly allocated slots because > there is no distinction between new or already used. This behaviour can cause > that every tasks gets deployed to its own slot if the {{SlotPool}} has > released all slots in the meantime, for example. The consequence could be > that a job can no longer be executed after a failure because it needs more > slots than before. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10713) RestartIndividualStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10713: -- Fix Version/s: 1.7.0 > RestartIndividualStrategy does not restore state > > > Key: FLINK-10713 > URL: https://issues.apache.org/jira/browse/FLINK-10713 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Priority: Critical > Fix For: 1.7.0 > > > RestartIndividualStrategy does not perform any state restore. This is big > problem because all restored regions will be restarted with empty state. We > need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state
[ https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10712: -- Fix Version/s: 1.7.0 > RestartPipelinedRegionStrategy does not restore state > - > > Key: FLINK-10712 > URL: https://issues.apache.org/jira/browse/FLINK-10712 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: Stefan Richter >Priority: Critical > Fix For: 1.7.0 > > > RestartPipelinedRegionStrategy does not perform any state restore. This is > big problem because all restored regions will be restarted with empty state. > We need to take checkpoints into account when restoring. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10715) E2e tests fail with ConcurrentModificationException in MetricRegistryImpl
[ https://issues.apache.org/jira/browse/FLINK-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667296#comment-16667296 ] Chesnay Schepler commented on FLINK-10715: -- This is effectively a duplicate of FLINK-10035. > E2e tests fail with ConcurrentModificationException in MetricRegistryImpl > - > > Key: FLINK-10715 > URL: https://issues.apache.org/jira/browse/FLINK-10715 > Project: Flink > Issue Type: Bug > Components: E2E Tests, Metrics >Affects Versions: 1.7.0 >Reporter: Dawid Wysakowicz >Priority: Blocker > > Couple of e2e tests that rely on metrics fail with exception: > {code} > 2018-10-29 11:40:32,781 WARN > org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while > reporting metrics > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) > at java.util.HashMap$EntryIterator.next(HashMap.java:1471) > at java.util.HashMap$EntryIterator.next(HashMap.java:1469) > at > org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:101) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > Tests that failed: > * 'Resuming Externalized Checkpoint (file, sync, no parallelism change) > end-to-end test > * 'State TTL Heap backend end-to-end test' -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10718) Use IO executor in RpcService for message serialization
Till Rohrmann created FLINK-10718: - Summary: Use IO executor in RpcService for message serialization Key: FLINK-10718 URL: https://issues.apache.org/jira/browse/FLINK-10718 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.6.2, 1.5.5, 1.7.0 Reporter: Till Rohrmann Fix For: 1.7.0 The {{AkkaInvocationHandler}} and once FLINK-10251 has been merged also the {{AkkaRpcActor}} serialize their remote RPC messages before sending them. This happens in the calling thread which can be a main thread of another {{RpcEndpoint}}. Depending on the de-/serialization time, this can be considered a blocking operation. Thus, I propose to introduce an IO executor which is being used for the serialization of the messages. This will make the {{RpcService}} abstraction more scalable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10717) Introduce SimpleSerializerSnapshot as replacement for ParameterlessTypeSerializerConfig
Tzu-Li (Gordon) Tai created FLINK-10717: --- Summary: Introduce SimpleSerializerSnapshot as replacement for ParameterlessTypeSerializerConfig Key: FLINK-10717 URL: https://issues.apache.org/jira/browse/FLINK-10717 Project: Flink Issue Type: Sub-task Components: Type Serialization System Reporter: Tzu-Li (Gordon) Tai Assignee: Stephan Ewen -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors
[ https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667290#comment-16667290 ] ASF GitHub Bot commented on FLINK-10600: pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#discussion_r228949560 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -538,6 +550,11 @@ under the License. shade + + + org.apache.flink:flink-connector-kafka_${scala.binary.version} Review comment: Ok, right. We can only exclude the modern one and `0.11`? And we can still exclude everything from "modern" `KafkaExample`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide End-to-end test cases for modern Kafka connectors > - > > Key: FLINK-10600 > URL: https://issues.apache.org/jira/browse/FLINK-10600 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors
pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#discussion_r228949560 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -538,6 +550,11 @@ under the License. shade + + + org.apache.flink:flink-connector-kafka_${scala.binary.version} Review comment: Ok, right. We can only exclude the modern one and `0.11`? And we can still exclude everything from "modern" `KafkaExample`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9552) NPE in SpanningRecordSerializer during checkpoint with iterations
[ https://issues.apache.org/jira/browse/FLINK-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9552: - Fix Version/s: 1.7.0 > NPE in SpanningRecordSerializer during checkpoint with iterations > - > > Key: FLINK-9552 > URL: https://issues.apache.org/jira/browse/FLINK-9552 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.5.0 >Reporter: Truong Duc Kien >Assignee: vinoyang >Priority: Major > Fix For: 1.7.0 > > > We're encountering NPE intermittently inside SpanningRecordSerializer during > checkpoint. > > {code:java} > 2018-06-08 08:31:35,741 [ka.actor.default-dispatcher-83] INFO > o.a.f.r.e.ExecutionGraph IterationSource-22 (44/120) > (c1b94ef849db0e5fb9fb7b85c17073ce) switched from RUNNING to FAILED. > java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 5 more > 2018-06-08 08:31:35,746 [ka.actor.default-dispatcher-83] INFO > o.a.f.r.e.ExecutionGraph Job xxx (8a4eaf02c46dc21c7d6f3f70657dbb17) switched > from state RUNNING to FAILING. > java.lang.RuntimeException > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > at > org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 5 more > {code} > This issue is probably concurrency related, because the revelant Flink code > seems to have proper null checking > https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java#L98 > {code:java} > // Copy from intermediate buffers to current target memory segment > if (targetBuffer != null) { > targetBuffer.append(lengthBuffer); > targetBuffer.append(dataBuffer); > targetBuffer.commit(); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10603) Reduce kafka test duration
[ https://issues.apache.org/jira/browse/FLINK-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667287#comment-16667287 ] Piotr Nowojski commented on FLINK-10603: I think the single largest overhead comes from This run time overhead comes from this code: {code:java} // this means that this is either: // (1) the first execution of this application // (2) previous execution has failed before first checkpoint completed // // in case of (2) we have to abort all previous transactions abortTransactions(transactionalIdsGenerator.generateIdsToAbort()); {code} in {{org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011#initializeState}} Commenting this line out single handedly speeds up the tests by 30-40%. Of course we can not do that on the production, but maybe we could introduce private/internal switch {{assumeFirstExecution}} or {{forceNotAbortOldTransactions}} that we could use in all of the tests except of the one test that is checking for this exact feature {{FlinkKafkaProducer011ITCase#testScaleDownBeforeFirstCheckpoint}}. But this wouldn't be a healthy "hack"... Besides that one would have to time profile the most time consuming tests more and try to figure out if we can speed them up. > Reduce kafka test duration > -- > > Key: FLINK-10603 > URL: https://issues.apache.org/jira/browse/FLINK-10603 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector, Tests >Affects Versions: 1.7.0 >Reporter: Chesnay Schepler >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > The tests for the modern kafka connector take more than 10 minutes which is > simply unacceptable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage
[ https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667248#comment-16667248 ] ASF GitHub Bot commented on FLINK-10097: kl0u commented on issue #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#issuecomment-433920446 Thanks a lot @azagrebin ! I will integrate your comments and merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > More tests to increase StreamingFileSink test coverage > -- > > Key: FLINK-10097 > URL: https://issues.apache.org/jira/browse/FLINK-10097 > Project: Flink > Issue Type: Sub-task > Components: filesystem-connector >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u commented on issue #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink
kl0u commented on issue #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#issuecomment-433920446 Thanks a lot @azagrebin ! I will integrate your comments and merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10668) Streaming File Sink E2E test fails because not all legitimate exceptions are excluded
[ https://issues.apache.org/jira/browse/FLINK-10668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667238#comment-16667238 ] ASF GitHub Bot commented on FLINK-10668: GJL commented on issue #6950: [FLINK-10668][e2e] Streaming File Sink E2E test fails because not all legitimate exceptions are excluded URL: https://github.com/apache/flink/pull/6950#issuecomment-433917808 I think this is relevant: https://github.com/apache/flink/pull/6959 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streaming File Sink E2E test fails because not all legitimate exceptions are > excluded > - > > Key: FLINK-10668 > URL: https://issues.apache.org/jira/browse/FLINK-10668 > Project: Flink > Issue Type: Bug > Components: E2E Tests >Affects Versions: 1.6.1, 1.7.0 >Reporter: Gary Yao >Assignee: Hequn Cheng >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.3, 1.7.0 > > > Streaming File Sink E2E test fails because not all legitimate exceptions are > excluded. > The stacktrace below can appear in the logs generated by the test but > {{check_logs_for_exceptions}} does not exclude all expected exceptions. > {noformat} > java.io.IOException: Connecting the channel failed: Connecting to remote task > manager + 'xxx/10.0.x.xx:50849' has failed. This might indicate that the > remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:85) > at > org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connecting to remote task manager + 'xxx/10.0.x.xx:50849' has failed. > This might indicate that the remote task manager has been lost. > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:219) > at > org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:133) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327) > at >
[GitHub] GJL commented on issue #6950: [FLINK-10668][e2e] Streaming File Sink E2E test fails because not all legitimate exceptions are excluded
GJL commented on issue #6950: [FLINK-10668][e2e] Streaming File Sink E2E test fails because not all legitimate exceptions are excluded URL: https://github.com/apache/flink/pull/6950#issuecomment-433917808 I think this is relevant: https://github.com/apache/flink/pull/6959 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage
[ https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667226#comment-16667226 ] ASF GitHub Bot commented on FLINK-10097: azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#discussion_r228928607 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java ## @@ -50,233 +43,234 @@ @Test public void testDefaultRollingPolicy() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); - final RollingPolicy, String> rollingPolicy = DefaultRollingPolicy - .create() - .withMaxPartSize(10L) - .withInactivityInterval(4L) - .withRolloverInterval(11L) - .build(); - - try ( - OneInputStreamOperatorTestHarness, Object> testHarness = TestUtils.createCustomRescalingTestSink( - outDir, - 1, - 0, - 1L, - new TestUtils.TupleToStringBucketer(), - new SimpleStringEncoder<>(), - rollingPolicy, - new DefaultBucketFactoryImpl<>()) - ) { - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(0L); + final RollingPolicy originalRollingPolicy = + DefaultRollingPolicy + .create() + .withMaxPartSize(10L) + .withInactivityInterval(4L) + .withRolloverInterval(11L) + .build(); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); - TestUtils.checkLocalFs(outDir, 1, 0); + final MethodCallCountingPolicyWrapper rollingPolicy = + new MethodCallCountingPolicyWrapper<>(originalRollingPolicy); - // roll due to size - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); - TestUtils.checkLocalFs(outDir, 1, 0); + final Buckets buckets = createBuckets(path, rollingPolicy); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L)); - TestUtils.checkLocalFs(outDir, 2, 0); + rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L); - // roll due to inactivity - testHarness.setProcessingTime(7L); + // these two will fill up the first in-progress file and at the third it will roll ... + buckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 1L)); + buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L)); + rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L)); - TestUtils.checkLocalFs(outDir, 3, 0); + buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L)); + rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L); - // roll due to rollover interval - testHarness.setProcessingTime(20L); + // still no time to roll + buckets.onProcessingTime(5L); + rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 1L, 0L); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L)); - TestUtils.checkLocalFs(outDir, 4, 0); + // roll due to inactivity + buckets.onProcessingTime(7L); + rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 2L, 1L); - // we take a checkpoint but we should not roll. - testHarness.snapshot(1L, 1L); + buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L)); - TestUtils.checkLocalFs(outDir, 4, 0); +
[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage
[ https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667224#comment-16667224 ] ASF GitHub Bot commented on FLINK-10097: azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#discussion_r228873834 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java ## @@ -40,54 +45,288 @@ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @Test - public void testContextPassingNormalExecution() throws Exception { - testCorrectPassingOfContext(1L, 2L, 3L); + public void testSnapshotAndRestore() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); + + final RollingPolicy onCheckpointRP = OnCheckpointRollingPolicy.build(); + + final Buckets buckets = createBuckets(path, onCheckpointRP, 0); + + final ListState bucketStateContainer = new MockListState<>(); + final ListState partCounterContainer = new MockListState<>(); + + buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L)); + buckets.snapshotState(0L, bucketStateContainer, partCounterContainer); + + buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 2L)); + buckets.snapshotState(1L, bucketStateContainer, partCounterContainer); + + Buckets restoredBuckets = + restoreBuckets(path, onCheckpointRP, 0, bucketStateContainer, partCounterContainer); + + final Map> activeBuckets = restoredBuckets.getActiveBuckets(); + Assert.assertEquals(2L, activeBuckets.size()); + Assert.assertTrue(activeBuckets.keySet().contains("test1")); Review comment: as I understand, the check logic for test1/2 is the same, it could be deduped in helper method This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > More tests to increase StreamingFileSink test coverage > -- > > Key: FLINK-10097 > URL: https://issues.apache.org/jira/browse/FLINK-10097 > Project: Flink > Issue Type: Sub-task > Components: filesystem-connector >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage
[ https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667225#comment-16667225 ] ASF GitHub Bot commented on FLINK-10097: azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#discussion_r228920095 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java ## @@ -484,131 +479,4 @@ public void testScalingDownAndMergingOfStates() throws Exception { Assert.assertEquals(3L, counter); } } - - @Test - public void testMaxCounterUponRecovery() throws Exception { Review comment: Is this test not relevant anymore? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > More tests to increase StreamingFileSink test coverage > -- > > Key: FLINK-10097 > URL: https://issues.apache.org/jira/browse/FLINK-10097 > Project: Flink > Issue Type: Sub-task > Components: filesystem-connector >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.3 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink
azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#discussion_r228873834 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java ## @@ -40,54 +45,288 @@ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @Test - public void testContextPassingNormalExecution() throws Exception { - testCorrectPassingOfContext(1L, 2L, 3L); + public void testSnapshotAndRestore() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); + + final RollingPolicy onCheckpointRP = OnCheckpointRollingPolicy.build(); + + final Buckets buckets = createBuckets(path, onCheckpointRP, 0); + + final ListState bucketStateContainer = new MockListState<>(); + final ListState partCounterContainer = new MockListState<>(); + + buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L)); + buckets.snapshotState(0L, bucketStateContainer, partCounterContainer); + + buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 2L)); + buckets.snapshotState(1L, bucketStateContainer, partCounterContainer); + + Buckets restoredBuckets = + restoreBuckets(path, onCheckpointRP, 0, bucketStateContainer, partCounterContainer); + + final Map> activeBuckets = restoredBuckets.getActiveBuckets(); + Assert.assertEquals(2L, activeBuckets.size()); + Assert.assertTrue(activeBuckets.keySet().contains("test1")); Review comment: as I understand, the check logic for test1/2 is the same, it could be deduped in helper method This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink
azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#discussion_r228928607 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java ## @@ -50,233 +43,234 @@ @Test public void testDefaultRollingPolicy() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); - final RollingPolicy, String> rollingPolicy = DefaultRollingPolicy - .create() - .withMaxPartSize(10L) - .withInactivityInterval(4L) - .withRolloverInterval(11L) - .build(); - - try ( - OneInputStreamOperatorTestHarness, Object> testHarness = TestUtils.createCustomRescalingTestSink( - outDir, - 1, - 0, - 1L, - new TestUtils.TupleToStringBucketer(), - new SimpleStringEncoder<>(), - rollingPolicy, - new DefaultBucketFactoryImpl<>()) - ) { - testHarness.setup(); - testHarness.open(); - - testHarness.setProcessingTime(0L); + final RollingPolicy originalRollingPolicy = + DefaultRollingPolicy + .create() + .withMaxPartSize(10L) + .withInactivityInterval(4L) + .withRolloverInterval(11L) + .build(); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 1), 1L)); - TestUtils.checkLocalFs(outDir, 1, 0); + final MethodCallCountingPolicyWrapper rollingPolicy = + new MethodCallCountingPolicyWrapper<>(originalRollingPolicy); - // roll due to size - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 2), 2L)); - TestUtils.checkLocalFs(outDir, 1, 0); + final Buckets buckets = createBuckets(path, rollingPolicy); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 3), 3L)); - TestUtils.checkLocalFs(outDir, 2, 0); + rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L); - // roll due to inactivity - testHarness.setProcessingTime(7L); + // these two will fill up the first in-progress file and at the third it will roll ... + buckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 1L)); + buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L)); + rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 4), 4L)); - TestUtils.checkLocalFs(outDir, 3, 0); + buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L)); + rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L); - // roll due to rollover interval - testHarness.setProcessingTime(20L); + // still no time to roll + buckets.onProcessingTime(5L); + rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 1L, 0L); - testHarness.processElement(new StreamRecord<>(Tuple2.of("test1", 5), 5L)); - TestUtils.checkLocalFs(outDir, 4, 0); + // roll due to inactivity + buckets.onProcessingTime(7L); + rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 2L, 1L); - // we take a checkpoint but we should not roll. - testHarness.snapshot(1L, 1L); + buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L)); - TestUtils.checkLocalFs(outDir, 4, 0); + // roll due to rollover interval + buckets.onProcessingTime(20L); + rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L); - // acknowledge the checkpoint, so publish the 3 closed
[GitHub] azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink
azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink URL: https://github.com/apache/flink/pull/6520#discussion_r228920095 ## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java ## @@ -484,131 +479,4 @@ public void testScalingDownAndMergingOfStates() throws Exception { Assert.assertEquals(3L, counter); } } - - @Test - public void testMaxCounterUponRecovery() throws Exception { Review comment: Is this test not relevant anymore? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10716) Upgrade ListSerializer / ArrayListSerializer for state evolution
Tzu-Li (Gordon) Tai created FLINK-10716: --- Summary: Upgrade ListSerializer / ArrayListSerializer for state evolution Key: FLINK-10716 URL: https://issues.apache.org/jira/browse/FLINK-10716 Project: Flink Issue Type: Sub-task Components: Type Serialization System Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Besides upgrading their snapshots, we should also make sure we have a corresponding {{ListSerializerMigrationTest}} and {{ArrayListSerializerMigrationTest}} to safe guard the upgrade. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors
[ https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667191#comment-16667191 ] ASF GitHub Bot commented on FLINK-10600: yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#discussion_r228923208 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -538,6 +550,11 @@ under the License. shade + + + org.apache.flink:flink-connector-kafka_${scala.binary.version} Review comment: We can't exclude these connectors, which is reported in my travis because there is a cascading dependency between the previous connectors. If you exclude it, you will get an error: ``` java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.streaming.examples.kafka.Kafka010Example.main(Kafka010Example.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 29 more ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide End-to-end test cases for modern Kafka connectors > - > > Key: FLINK-10600 > URL: https://issues.apache.org/jira/browse/FLINK-10600 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#discussion_r228923208 ## File path: flink-examples/flink-examples-streaming/pom.xml ## @@ -538,6 +550,11 @@ under the License. shade + + + org.apache.flink:flink-connector-kafka_${scala.binary.version} Review comment: We can't exclude these connectors, which is reported in my travis because there is a cascading dependency between the previous connectors. If you exclude it, you will get an error: ``` java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09 at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.streaming.examples.kafka.Kafka010Example.main(Kafka010Example.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 29 more ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors
[ https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667188#comment-16667188 ] ASF GitHub Bot commented on FLINK-10600: yanghua edited a comment on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230 Let me try to locate it. This code was written by @tzulitai before. It seems the call : ``` $(get_partition_end_offset test-input 1) ``` may get a `null` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide End-to-end test cases for modern Kafka connectors > - > > Key: FLINK-10600 > URL: https://issues.apache.org/jira/browse/FLINK-10600 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua edited a comment on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors
yanghua edited a comment on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230 Let me try to locate it. This code was written by @tzulitai before. It seems the call : ``` $(get_partition_end_offset test-input 1) ``` may get a `null` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors
[ https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667183#comment-16667183 ] ASF GitHub Bot commented on FLINK-10600: yanghua commented on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230 Let me try to locate it. This code was written by @tzulitai before. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide End-to-end test cases for modern Kafka connectors > - > > Key: FLINK-10600 > URL: https://issues.apache.org/jira/browse/FLINK-10600 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Reporter: vinoyang >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8953) Resolve unresolved field references in FieldComputer expressions
[ https://issues.apache.org/jira/browse/FLINK-8953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xueyu reassigned FLINK-8953: Assignee: xueyu > Resolve unresolved field references in FieldComputer expressions > > > Key: FLINK-8953 > URL: https://issues.apache.org/jira/browse/FLINK-8953 > Project: Flink > Issue Type: Improvement > Components: Table API SQL >Reporter: Timo Walther >Assignee: xueyu >Priority: Major > > When implementing the {{FieldComputer.getExpression}} method, it is not > possible to use API classes but only internal expression case classes. > It would be great to also define timestamp extractors like: > {code} > def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression > = { > // 'x.cast(Types.LONG) > // ExpressionParser.parseExpression("x.cast(LONG)") > } > {code} > An even better solution would be to provide different `getExpression()` > methods that an implementor can override. The general goal should be to > define this as natural as possible. In the future we should also support SQL: > {code} > def getJavaExpression(fieldAccesses: Array[ResolvedFieldReference]): String > = { > "x.cast(LONG)" > } > def getSQLExpression(fieldAccesses: Array[ResolvedFieldReference]): String > = { > "CAST(x AS LONG)" > } > {code} > The final design is still up for discussion. These are just ideas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors
yanghua commented on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230 Let me try to locate it. This code was written by @tzulitai before. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys commented on issue #6955: [hotfix] [docs] Fix typo in ConfigGroups
dawidwys commented on issue #6955: [hotfix] [docs] Fix typo in ConfigGroups URL: https://github.com/apache/flink/pull/6955#issuecomment-433901467 Hi @linzhaoming thank you for your contribution. Please remember though to update the PR description when opening. Merging This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys closed pull request #6955: [hotfix] [docs] Fix typo in ConfigGroups
dawidwys closed pull request #6955: [hotfix] [docs] Fix typo in ConfigGroups URL: https://github.com/apache/flink/pull/6955 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java index 53bf856c0a7..5afe9f9ac41 100644 --- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java @@ -26,7 +26,7 @@ import java.lang.annotation.Target; /** - * Annotation used on classes containing config optionss that enables the separation of options into different + * Annotation used on classes containing config options that enables the separation of options into different * tables based on key prefixes. A config option is assigned to a {@link ConfigGroup} if the option key matches * the group prefix. If a key matches multiple prefixes the longest matching prefix takes priority. An option is never * assigned to multiple groups. Options that don't match any group are implicitly added to a default group. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free
[ https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667177#comment-16667177 ] ASF GitHub Bot commented on FLINK-10687: aljoscha commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free URL: https://github.com/apache/flink/pull/6958#issuecomment-433903067 I did review everything except the new parser. Are there any tests that test more complex things, like nested types, deep nesting, and things? Otherwise this looks good. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make flink-formats Scala-free > - > > Key: FLINK-10687 > URL: https://issues.apache.org/jira/browse/FLINK-10687 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > {{flink-table}} is the only dependency that pulls in Scala for > {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} > Scala-free using only a dependency to {{flink-table-common}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)