[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668116#comment-16668116
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async 
backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#issuecomment-434168230
 
 
   I rebased my PR, and I'll be working on the new changes to address the PR 
comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jparkie commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-29 Thread GitBox
jparkie commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async 
backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#issuecomment-434168230
 
 
   I rebased my PR, and I'll be working on the new changes to address the PR 
comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668098#comment-16668098
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229170161
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -127,35 +144,112 @@ public void close() throws Exception {
}
 
@Override
-   public void initializeState(FunctionInitializationContext context) 
throws Exception {
-   }
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {}
 
@Override
-   public void snapshotState(FunctionSnapshotContext ctx) throws Exception 
{
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
checkAsyncErrors();
-   waitForPendingUpdates();
+   flush();
checkAsyncErrors();
}
 
-   private void waitForPendingUpdates() throws InterruptedException {
-   synchronized (updatesPending) {
-   while (updatesPending.get() > 0) {
-   updatesPending.wait();
+   @Override
+   public void invoke(IN value) throws Exception {
+   checkAsyncErrors();
+   tryAcquire();
+   final ListenableFuture result = send(value);
+   Futures.addCallback(result, new FutureCallback() {
+   @Override
+   public void onSuccess(V ignored) {
+   release();
}
+
+   @Override
+   public void onFailure(Throwable currentError) {
+   setAsyncErrors(currentError);
+   release();
+   }
+   });
+   }
+
+   // --- User-Defined Sink Methods 
--
+
+   public abstract ListenableFuture send(IN value);
+
+   // - Configuration Methods 

+
+   /**
+* Sets the maximum allowed number of concurrent requests for this sink.
+*
+* @param maxConcurrentRequests maximum number of concurrent requests 
allowed
+* @param timeout timeout duration when acquiring a permit to execute
+* @param unit timeout unit when acquiring a permit to execute
+*/
+   public void setMaxConcurrentRequests(int maxConcurrentRequests, long 
timeout, TimeUnit unit) {
+   Preconditions.checkArgument(maxConcurrentRequests >= 0, 
"maxConcurrentRequests cannot be negative.");
+   Preconditions.checkArgument(timeout >= 0, "timeout cannot be 
negative.");
+   this.maxConcurrentRequests = maxConcurrentRequests;
+   this.maxConcurrentRequestsTimeout = timeout;
+   this.maxConcurrentRequestsTimeoutUnit = unit;
+   }
+
+   // --- Cassandra Methods 
--
+
+   protected Cluster createCluster() {
 
 Review comment:
   The `CassandraPojoSink` relies on it.
   
   - 
https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java#L92


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the 

[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-29 Thread GitBox
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229170161
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -127,35 +144,112 @@ public void close() throws Exception {
}
 
@Override
-   public void initializeState(FunctionInitializationContext context) 
throws Exception {
-   }
+   public void initializeState(FunctionInitializationContext context) 
throws Exception {}
 
@Override
-   public void snapshotState(FunctionSnapshotContext ctx) throws Exception 
{
+   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
checkAsyncErrors();
-   waitForPendingUpdates();
+   flush();
checkAsyncErrors();
}
 
-   private void waitForPendingUpdates() throws InterruptedException {
-   synchronized (updatesPending) {
-   while (updatesPending.get() > 0) {
-   updatesPending.wait();
+   @Override
+   public void invoke(IN value) throws Exception {
+   checkAsyncErrors();
+   tryAcquire();
+   final ListenableFuture result = send(value);
+   Futures.addCallback(result, new FutureCallback() {
+   @Override
+   public void onSuccess(V ignored) {
+   release();
}
+
+   @Override
+   public void onFailure(Throwable currentError) {
+   setAsyncErrors(currentError);
+   release();
+   }
+   });
+   }
+
+   // --- User-Defined Sink Methods 
--
+
+   public abstract ListenableFuture send(IN value);
+
+   // - Configuration Methods 

+
+   /**
+* Sets the maximum allowed number of concurrent requests for this sink.
+*
+* @param maxConcurrentRequests maximum number of concurrent requests 
allowed
+* @param timeout timeout duration when acquiring a permit to execute
+* @param unit timeout unit when acquiring a permit to execute
+*/
+   public void setMaxConcurrentRequests(int maxConcurrentRequests, long 
timeout, TimeUnit unit) {
+   Preconditions.checkArgument(maxConcurrentRequests >= 0, 
"maxConcurrentRequests cannot be negative.");
+   Preconditions.checkArgument(timeout >= 0, "timeout cannot be 
negative.");
+   this.maxConcurrentRequests = maxConcurrentRequests;
+   this.maxConcurrentRequestsTimeout = timeout;
+   this.maxConcurrentRequestsTimeoutUnit = unit;
+   }
+
+   // --- Cassandra Methods 
--
+
+   protected Cluster createCluster() {
 
 Review comment:
   The `CassandraPojoSink` relies on it.
   
   - 
https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java#L92


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10231) Add a view SQL DDL

2018-10-29 Thread winifredtang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668093#comment-16668093
 ] 

winifredtang commented on FLINK-10231:
--

[~fhueske] Hello, could you give more details about it?  I try to achieve a 
view SQL DDL. I wonder if it would better for me to use the 
TableEnvironment.registerTableInternal(name: String, table: AbstractTable) 
instead of the TableEnvironment.registerTable(name: String, table: Table). 
Thanks a lot.

> Add a view SQL DDL
> --
>
> Key: FLINK-10231
> URL: https://issues.apache.org/jira/browse/FLINK-10231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> FLINK-10163 added initial view support for the SQL Client. However, for 
> supporting the [full definition of 
> views|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterView]
>  (with schema, comments, etc.) we need to support native support for views in 
> the Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668091#comment-16668091
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229169780
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
 
 Review comment:
   I'm seeing quite a few of existing connectors relying on `Map`
   
   - 
https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L204
   - 
https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java#L87
   
   Do you recommend following the same scheme? I can then use `ParameterTool` 
to enforce parameter types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10231) Add a view SQL DDL

2018-10-29 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-10231:


Assignee: winifredtang

> Add a view SQL DDL
> --
>
> Key: FLINK-10231
> URL: https://issues.apache.org/jira/browse/FLINK-10231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: winifredtang
>Priority: Major
>
> FLINK-10163 added initial view support for the SQL Client. However, for 
> supporting the [full definition of 
> views|https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-Create/Drop/AlterView]
>  (with schema, comments, etc.) we need to support native support for views in 
> the Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-29 Thread GitBox
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229169780
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
 
 Review comment:
   I'm seeing quite a few of existing connectors relying on `Map`
   
   - 
https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java#L204
   - 
https://github.com/apache/flink/blob/release-1.6.2/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/AvroKeyValueSinkWriter.java#L87
   
   Do you recommend following the same scheme? I can then use `ParameterTool` 
to enforce parameter types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668081#comment-16668081
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229168212
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   That's true; a concurrent set of `Future`s would be simpler. I've seen 
implementations of similar code in other OSS that rely on a concurrent 
`TrieMap` to track all the `pendingFutures` before a snapshot. At the snapshot, 
it iterates the `pendingFutures` calling `get()` to block which either returns 
or throws an exception. 

[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-29 Thread GitBox
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229168212
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+   // --- Cassandra Fields 
---
 
private final ClusterBuilder builder;
 
-   private final AtomicInteger updatesPending = new AtomicInteger();
+   protected transient Cluster cluster;
+   protected transient Session session;
+
+   //  Synchronization Fields 

+
+   private AtomicReference throwable;
+   private Semaphore semaphore;
+   private Phaser phaser;
 
CassandraSinkBase(ClusterBuilder builder) {
this.builder = builder;
ClosureCleaner.clean(builder, true);
}
 
-   @Override
-   public void open(Configuration configuration) {
-   this.callback = new FutureCallback() {
-   @Override
-   public void onSuccess(V ignored) {
-   int pending = updatesPending.decrementAndGet();
-   if (pending == 0) {
-   synchronized (updatesPending) {
-   updatesPending.notifyAll();
-   }
-   }
-   }
+   // - Sink Methods 
-
 
+   @Override
+   public void open(Configuration parameters) {
+   cluster = createCluster();
+   session = createSession();
+
+   throwable = new AtomicReference<>();
+   semaphore = new Semaphore(maxConcurrentRequests);
+   /*
+* A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+*
+* This Phaser is configured to support "1 + N" parties.
+*   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+*   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+*
+* The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+* This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+* are being released during a flush() call.
+*/
+   phaser = new Phaser(1) {
 
 Review comment:
   That's true; a concurrent set of `Future`s would be simpler. I've seen 
implementations of similar code in other OSS that rely on a concurrent 
`TrieMap` to track all the `pendingFutures` before a snapshot. At the snapshot, 
it iterates the `pendingFutures` calling `get()` to block which either returns 
or throws an exception. However, the `Future`s are created in large batches 
with Spark, so this implementation may have caveats in the streaming nature of 
Flink.
   
   In Java, if we use a set backed by a `ConcurrentHashMap`, since we rely on a 
semaphore to 

[jira] [Assigned] (FLINK-9142) Lower the minimum number of buffers for incoming channels to 1

2018-10-29 Thread blues zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

blues zheng reassigned FLINK-9142:
--

Assignee: blues zheng

> Lower the minimum number of buffers for incoming channels to 1
> --
>
> Key: FLINK-9142
> URL: https://issues.apache.org/jira/browse/FLINK-9142
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: blues zheng
>Priority: Major
> Fix For: 1.6.3, 1.7.0
>
>
> Even if we make the floating buffers optional, we still require 
> {{taskmanager.network.memory.buffers-per-channel}} number of (exclusive) 
> buffers per incoming channel with credit-based flow control while without, 
> the minimum was 1 and only the maximum number of buffers was influenced by 
> this parameter.
> {{taskmanager.network.memory.buffers-per-channel}} is set to {{2}} by default 
> with the argumentation that this way we will have one buffer available for 
> netty to process while a worker thread is processing/deserializing the other 
> buffer. While this seems reasonable, it does increase our minimum 
> requirements. Instead, we could probably live with {{1}} exclusive buffer and 
> up to {{gate.getNumberOfInputChannels() * (networkBuffersPerChannel - 1) + 
> extraNetworkBuffersPerGate}} floating buffers. That way we will have the same 
> memory footprint as before with only slightly changed behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668058#comment-16668058
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229163101
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
 
 Review comment:
   Yeah. I thought the constructor would explode too much. Furthermore, I was 
hesitant to introduce a new constructor that conflicted with the existing if 
anyone relied on the package public one.
   
   However, I like your idea of a config class. I can make one constructor 
accept that and have all the old constructors with a `@deprecated` annotation 
that just delegates to the new one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-29 Thread GitBox
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229163101
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase extends RichSinkFunction 
implements CheckpointedFunction {
protected final Logger log = LoggerFactory.getLogger(getClass());
-   protected transient Cluster cluster;
-   protected transient Session session;
 
-   protected transient volatile Throwable exception;
-   protected transient FutureCallback callback;
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+   private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+   private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
 
 Review comment:
   Yeah. I thought the constructor would explode too much. Furthermore, I was 
hesitant to introduce a new constructor that conflicted with the existing if 
anyone relied on the package public one.
   
   However, I like your idea of a config class. I can make one constructor 
accept that and have all the old constructors with a `@deprecated` annotation 
that just delegates to the new one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9083) Add async backpressure support to Cassandra Connector

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668054#comment-16668054
 ] 

ASF GitHub Bot commented on FLINK-9083:
---

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229162559
 
 

 ##
 File path: docs/dev/connectors/cassandra.md
 ##
 @@ -72,10 +72,13 @@ The following configuration methods can be used:
 4. _setMapperOptions(MapperOptions options)_
 * Sets the mapper options that are used to configure the DataStax 
ObjectMapper.
 * Only applies when processing __POJO__ data types.
-5. _enableWriteAheadLog([CheckpointCommitter committer])_
+5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit 
unit)_
+* Sets the maximum allowed number of concurrent requests with a timeout 
for acquiring permits to execute.
+* Only applies when __enableWriteAheadLog()__ is not configured.
 
 Review comment:
   Yeah. It was just implementation effort. I was thinking of a follow up 
ticket to implement it and verify it, since the WAL version is always the 
tricky one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -
>
> Key: FLINK-9083
> URL: https://issues.apache.org/jira/browse/FLINK-9083
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Reporter: Jacob Park
>Assignee: Jacob Park
>Priority: Minor
>  Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-10-29 Thread GitBox
jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229162559
 
 

 ##
 File path: docs/dev/connectors/cassandra.md
 ##
 @@ -72,10 +72,13 @@ The following configuration methods can be used:
 4. _setMapperOptions(MapperOptions options)_
 * Sets the mapper options that are used to configure the DataStax 
ObjectMapper.
 * Only applies when processing __POJO__ data types.
-5. _enableWriteAheadLog([CheckpointCommitter committer])_
+5. _setMaxConcurrentRequests(int maxConcurrentRequests, long timeout, TimeUnit 
unit)_
+* Sets the maximum allowed number of concurrent requests with a timeout 
for acquiring permits to execute.
+* Only applies when __enableWriteAheadLog()__ is not configured.
 
 Review comment:
   Yeah. It was just implementation effort. I was thinking of a follow up 
ticket to implement it and verify it, since the WAL version is always the 
tricky one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668028#comment-16668028
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229159258
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -538,6 +550,11 @@ under the License.
shade


+   
+   
+   
org.apache.flink:flink-connector-kafka_${scala.binary.version}
 
 Review comment:
   yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r229159258
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -538,6 +550,11 @@ under the License.
shade


+   
+   
+   
org.apache.flink:flink-connector-kafka_${scala.binary.version}
 
 Review comment:
   yes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-10-29 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668025#comment-16668025
 ] 

vinoyang commented on FLINK-10704:
--

[~pnowojski] [~twalthr] Yes, I am just curious about why we ruled it out. In 
fact, Timo's idea is right. It is not good to check "error" in the shell (and 
ignore case). I will modify it slightly to have it check for "ERROR" (log 
level) and not ignore case. Later, we found that some specific log keywords 
indicate that it caused a test error, and we added to this list.

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10698) Create CatalogManager class manages all external catalogs and temporary meta objects

2018-10-29 Thread winifredtang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

winifredtang reassigned FLINK-10698:


Assignee: winifredtang

> Create CatalogManager class manages all external catalogs and temporary meta 
> objects
> 
>
> Key: FLINK-10698
> URL: https://issues.apache.org/jira/browse/FLINK-10698
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Xuefu Zhang
>Assignee: winifredtang
>Priority: Major
>
> Currently {{TableEnvironment}} manages a list of registered external catalogs 
> as well as in-memory meta objects, and interacts with Calcite schema. It 
> would be cleaner to delegate all those responsibilities to a dedicate class, 
> especially when Flink's meta objects are also stored in a catalog.
> {{CatalogManager}} is responsible to manage all meta objects, including 
> external catalogs, temporary meta objects, and Calcite schema.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668016#comment-16668016
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for 
modern Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725
 
 
   @pnowojski Not only the modern kafka connector, but also other connectors. 
Each version of the flink-connector relies on the base module, while the base 
module explicitly relies on kafka. So, here we need to exclude it from the base 
module and explicitly introduce the high version dependency of kafka.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668015#comment-16668015
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725
 
 
   @pnowojski Not only the modern kafka connector, but also other connectors. 
Each version of the flink-connector relies on the base module, while the base 
module shows dependencies on kafka. So, here we need to exclude it from the 
base module and explicitly introduce the high version dependency of kafka.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-29 Thread GitBox
yanghua edited a comment on issue #6703: [FLINK-9697] Provide connector for 
modern Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725
 
 
   @pnowojski Not only the modern kafka connector, but also other connectors. 
Each version of the flink-connector relies on the base module, while the base 
module explicitly relies on kafka. So, here we need to exclude it from the base 
module and explicitly introduce the high version dependency of kafka.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-29 Thread GitBox
yanghua commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-434147725
 
 
   @pnowojski Not only the modern kafka connector, but also other connectors. 
Each version of the flink-connector relies on the base module, while the base 
module shows dependencies on kafka. So, here we need to exclude it from the 
base module and explicitly introduce the high version dependency of kafka.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot

2018-10-29 Thread Xiaogang Shi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668009#comment-16668009
 ] 

Xiaogang Shi commented on FLINK-10714:
--

[~cmick] I came across a similar problem before. It seems that kryo cannot 
properly serialize some collection types. Finally i got rid of this problem by 
registering another serializer (e.g. JavaSerializer) for problematic collection 
types in ExecutionConfig.

> java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
> -
>
> Key: FLINK-10714
> URL: https://issues.apache.org/jira/browse/FLINK-10714
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.5, 1.6.2
> Environment: Flink 1.6.2, FsStateBackend
>Reporter: Michał Ciesielczyk
>Priority: Blocker
> Fix For: 1.7.0
>
>
> I'm sometimes getting error while creating a checkpointing using a filesystem 
> state backend. This ONLY happens when asynchronous snapshots are enabled 
> using the FileSystem State Backend. When RocksDB is enabled everything works 
> fine.
>  
> I'm using a simple KeyedStream,mapWithState function with a ValueState 
> holding a  hashmap (scala.collection.immutable.Map). It's hard to reproduce 
> the error using a simple code snippet, as the error occurs randomly.
>  
> This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
> but I'm still experiencing such behavior.
>   
> Stacktrace:
>  
> {code:java}
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>     at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
>     at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>  ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>  ~[flink-core-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
> [flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 

[jira] [Assigned] (FLINK-10718) Use IO executor in RpcService for message serialization

2018-10-29 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-10718:


Assignee: vinoyang

> Use IO executor in RpcService for message serialization
> ---
>
> Key: FLINK-10718
> URL: https://issues.apache.org/jira/browse/FLINK-10718
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{AkkaInvocationHandler}} and once FLINK-10251 has been merged also the 
> {{AkkaRpcActor}} serialize their remote RPC messages before sending them. 
> This happens in the calling thread which can be a main thread of another 
> {{RpcEndpoint}}. Depending on the de-/serialization time, this can be 
> considered a blocking operation. Thus, I propose to introduce an IO executor 
> which is being used for the serialization of the messages. This will make the 
> {{RpcService}} abstraction more scalable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kbohinski opened a new pull request #6962: change accumulator value to be formatted with digit group separator

2018-10-29 Thread GitBox
kbohinski opened a new pull request #6962: change accumulator value to be 
formatted with digit group separator
URL: https://github.com/apache/flink/pull/6962
 
 
   ## What is the purpose of the change
   
   *This pull request is a small qol change that makes it easier for users to 
read the values of their accumulator(s).*
   
   
   ## Brief change log
   
 - *Filter the value of the accumulator with toLocaleString*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) No
 - The serializers: (yes / no / don't know) No
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) No
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) No
 - The S3 file system connector: (yes / no / don't know) No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) No
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667841#comment-16667841
 ] 

ASF GitHub Bot commented on FLINK-10687:


bowenli86 edited a comment on issue #6958: [FLINK-10687] [table] Make 
flink-formats Scala-free
URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016
 
 
   I only had a detailed look at classes in `flink-format/flink-avro` and 
`flink-table-common`. Big PR while changes are mostly renaming and migrating 
without introducing new feature. Look very good to me!
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make flink-formats Scala-free
> -
>
> Key: FLINK-10687
> URL: https://issues.apache.org/jira/browse/FLINK-10687
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> {{flink-table}} is the only dependency that pulls in Scala for 
> {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} 
> Scala-free using only a dependency to {{flink-table-common}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bowenli86 edited a comment on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free

2018-10-29 Thread GitBox
bowenli86 edited a comment on issue #6958: [FLINK-10687] [table] Make 
flink-formats Scala-free
URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016
 
 
   I only had a detailed look at classes in `flink-format/flink-avro` and 
`flink-table-common`. Big PR while changes are mostly renaming and migrating 
without introducing new feature. Look very good to me!
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667839#comment-16667839
 ] 

ASF GitHub Bot commented on FLINK-10687:


bowenli86 commented on issue #6958: [FLINK-10687] [table] Make flink-formats 
Scala-free
URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016
 
 
   I only had a detailed look at classes in `flink-table-common`. Big PR while 
changes are mostly renaming and migrating without introducing new feature. Look 
very good to me!
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make flink-formats Scala-free
> -
>
> Key: FLINK-10687
> URL: https://issues.apache.org/jira/browse/FLINK-10687
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> {{flink-table}} is the only dependency that pulls in Scala for 
> {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} 
> Scala-free using only a dependency to {{flink-table-common}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] bowenli86 commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free

2018-10-29 Thread GitBox
bowenli86 commented on issue #6958: [FLINK-10687] [table] Make flink-formats 
Scala-free
URL: https://github.com/apache/flink/pull/6958#issuecomment-434103016
 
 
   I only had a detailed look at classes in `flink-table-common`. Big PR while 
changes are mostly renaming and migrating without introducing new feature. Look 
very good to me!
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10720) Add stress deployment end-to-end test

2018-10-29 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667809#comment-16667809
 ] 

Chesnay Schepler commented on FLINK-10720:
--

Could you check whether the {{flink-high-parallelism-iterations-test}} fulfills 
this ticket?

> Add stress deployment end-to-end test
> -
>
> Key: FLINK-10720
> URL: https://issues.apache.org/jira/browse/FLINK-10720
> Project: Flink
>  Issue Type: Sub-task
>  Components: E2E Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> In order to test Flink's scalability, I suggest to add an end-to-end test 
> which tests the deployment of a job which is very demanding. The job should 
> have large {{TaskDeploymentDescriptors}} (e.g. a job using union state or 
> having a high degree of parallelism). That way we can test that the 
> serialization overhead of the TDDs does not affect the health of the cluster 
> (e.g. heartbeats are not affected because the serialization does not happen 
> in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10720) Add stress deployment end-to-end test

2018-10-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10720:
-

 Summary: Add stress deployment end-to-end test
 Key: FLINK-10720
 URL: https://issues.apache.org/jira/browse/FLINK-10720
 Project: Flink
  Issue Type: Sub-task
  Components: E2E Tests
Affects Versions: 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.7.0


In order to test Flink's scalability, I suggest to add an end-to-end test which 
tests the deployment of a job which is very demanding. The job should have 
large {{TaskDeploymentDescriptors}} (e.g. a job using union state or having a 
high degree of parallelism). That way we can test that the serialization 
overhead of the TDDs does not affect the health of the cluster (e.g. heartbeats 
are not affected because the serialization does not happen in the main thread).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10686) Introduce a flink-table-common module

2018-10-29 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667642#comment-16667642
 ] 

Bowen Li commented on FLINK-10686:
--

Hi [~twalthr], I just found in our mail list that 1.7 release was targeted at 
end of Oct/beginning of Nov. Do you think we can make this task into 1.7?

> Introduce a flink-table-common module
> -
>
> Key: FLINK-10686
> URL: https://issues.apache.org/jira/browse/FLINK-10686
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.7.0
>
>
> Because more and more table factories for connectors and formats are added 
> and external catalog support is also on the horizon, {{flink-table}} becomes 
> a dependency for many Flink modules. Since {{flink-table}} is implemented in 
> Scala it requires other modules to be suffixes with Scala prefixes. However, 
> as we have learned in the past, Scala code is hard to maintain which is why 
> our long-term goal is to avoid Scala/Scala dependencies.
> Therefore we propose a new module {{flink-table-common}} that contains 
> interfaces between {{flink-table}} and other modules. This module is 
> implemented in Java and should contain minimal (or better no) external 
> dependencies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10719) Add user documentation for Flink-Hive integration

2018-10-29 Thread Bowen Li (JIRA)
Bowen Li created FLINK-10719:


 Summary: Add user documentation for Flink-Hive integration
 Key: FLINK-10719
 URL: https://issues.apache.org/jira/browse/FLINK-10719
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation, SQL Client, Table API  SQL
Reporter: Bowen Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10697) Create an in-memory catalog that stores Flink's meta objects

2018-10-29 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-10697:


Assignee: Bowen Li

> Create an in-memory catalog that stores Flink's meta objects
> 
>
> Key: FLINK-10697
> URL: https://issues.apache.org/jira/browse/FLINK-10697
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Xuefu Zhang
>Assignee: Bowen Li
>Priority: Major
>
> Currently all Flink meta objects (currently tables only) are stored in memory 
> as part of Calcite catalog. Those objects are temporary (such as inline 
> tables), others are meant to live beyond user session. As we introduce 
> catalog for those objects (tables, views, and UDFs), it makes sense to 
> organize them neatly. Further, having a catalog implementation that store 
> those objects in memory is to retain the currently behavior, which can be 
> configured by user.
> Please note that this implementation is different from the current 
> {{InMemoryExternalCatalog}, which is used mainly for testing and doesn't 
> reflect what's actually needed for Flink meta objects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10696) Add APIs to ExternalCatalog for views and UDFs

2018-10-29 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-10696:


Assignee: Bowen Li

> Add APIs to ExternalCatalog for views and UDFs
> --
>
> Key: FLINK-10696
> URL: https://issues.apache.org/jira/browse/FLINK-10696
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.6.1
>Reporter: Xuefu Zhang
>Assignee: Bowen Li
>Priority: Major
>
> Currently there are APIs for tables only. However, views and UDFs are also 
> common objects in a catalog.
> This is required when we store Flink tables/views/UDFs in an external 
> persistent storage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10686) Introduce a flink-table-common module

2018-10-29 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667608#comment-16667608
 ] 

Bowen Li commented on FLINK-10686:
--

I'd vote for this, and hope it can be done ASAP so all the coming Flink-Hive 
integration work can depend on Java only. 

> Introduce a flink-table-common module
> -
>
> Key: FLINK-10686
> URL: https://issues.apache.org/jira/browse/FLINK-10686
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.7.0
>
>
> Because more and more table factories for connectors and formats are added 
> and external catalog support is also on the horizon, {{flink-table}} becomes 
> a dependency for many Flink modules. Since {{flink-table}} is implemented in 
> Scala it requires other modules to be suffixes with Scala prefixes. However, 
> as we have learned in the past, Scala code is hard to maintain which is why 
> our long-term goal is to avoid Scala/Scala dependencies.
> Therefore we propose a new module {{flink-table-common}} that contains 
> interfaces between {{flink-table}} and other modules. This module is 
> implemented in Java and should contain minimal (or better no) external 
> dependencies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10556) Integration with Apache Hive

2018-10-29 Thread Bowen Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667607#comment-16667607
 ] 

Bowen Li commented on FLINK-10556:
--

FLINK-10686 will create a Java-based flink-table-common module that Flink-Hive 
integration may depend on. Once FLINK-10686 is done, the work of Flink-Hive 
integration may be written completely in Java.

background: the community has decided to move away from Scala as we've learned 
Scala is hard to maintain.

> Integration with Apache Hive
> 
>
> Key: FLINK-10556
> URL: https://issues.apache.org/jira/browse/FLINK-10556
> Project: Flink
>  Issue Type: New Feature
>  Components: Batch Connectors and Input/Output Formats, SQL Client, 
> Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
> Attachments: Proposal_ Integrate Flink with Hive Ecosystem.pdf
>
>
> This is an umbrella JIRA tracking all enhancement and issues related to 
> integrating Flink with Hive ecosystem. This is an outcome of a discussion in 
> the community, and thanks go to everyone that provided feedback and interest.
> Specifically, we'd like to see the following features and capabilities 
> immediately in Flink:
> # Metadata interoperability
> # Data interoperability
> # Data type compatibility
> # Hive UDF support
> # DDL/DML/Query language compatibility
> For a longer term, we'd also like to add or improve:
> # Compatible SQL service, client tools, JDBC/ODBC drivers
> # Better task failure tolerance and task scheduling
> # Support other user customizations in Hive (storage handlers, serdes, etc).
> I will provide more details regarding the proposal in a doc shortly. Design 
> doc, if deemed necessary, will be provided in each related sub tasks under 
> this JIRA.
> Feedback and contributions are greatly welcome!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4582) Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams

2018-10-29 Thread Ying Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667569#comment-16667569
 ] 

Ying Xu commented on FLINK-4582:


Thanks [~tinder-dthomson] for the detailed comments.   Yes that's exactly why I 
felt _efficient multi-stream_ support is somehow lacking :).  

Actually, we are running Flink 1.5.2 internally. For contributing to upstream, 
I'm currently adapting the patch to fit the master flink (1.7-SNAPSNOT). The 
main difference is flink 1.7 Kinesis connector uses the _listshards API_ to 
retrieve the shard list. For DynamoDB streams, we must use the _describeStreams 
API_ to retrieve such information since listshards is not supported. I am 
currently porting related logic around _describeStreams_ from the 1.5 flink to 
my patch.  I shall be able to post a meaningful PR in 1-2 days.  

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> 
>
> Key: FLINK-4582
> URL: https://issues.apache.org/jira/browse/FLINK-4582
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Ying Xu
>Priority: Major
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) Add entropy to s3 path for better scalability

2018-10-29 Thread Jonathan Miles (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667474#comment-16667474
 ] 

Jonathan Miles commented on FLINK-9061:
---

Agreed it should be harder, but we were able to trigger throttling while 
testing 1.4.2 around mid-August, after that announcement was made. We had 
around 10 jobs checkpointing to the same bucket and a different prefix for each 
job, something like 400 Task Managers. I know there was some work done to 
reduce the number of S3 requests made and combined with this "prefix entropy" 
change we haven't seen it happen again.

It might be useful to add [your 
link|https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/]
 to the documentation.

> Add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2, 1.5.0
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.2, 1.7.0
>
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) Add entropy to s3 path for better scalability

2018-10-29 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667437#comment-16667437
 ] 

Elias Levy commented on FLINK-9061:
---

Just a note to say that changes AWS made to S3 in July means that it is a lot 
more difficult to hit S3 performance limits that would require this feature, as 
now S3 can do up to 3.5K concurrent writes. See 
[https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/].

> Add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2, 1.5.0
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.2, 1.7.0
>
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot

2018-10-29 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-10714:
---
Fix Version/s: 1.7.0

> java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
> -
>
> Key: FLINK-10714
> URL: https://issues.apache.org/jira/browse/FLINK-10714
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.5, 1.6.2
> Environment: Flink 1.6.2, FsStateBackend
>Reporter: Michał Ciesielczyk
>Priority: Blocker
> Fix For: 1.7.0
>
>
> I'm sometimes getting error while creating a checkpointing using a filesystem 
> state backend. This ONLY happens when asynchronous snapshots are enabled 
> using the FileSystem State Backend. When RocksDB is enabled everything works 
> fine.
>  
> I'm using a simple KeyedStream,mapWithState function with a ValueState 
> holding a  hashmap (scala.collection.immutable.Map). It's hard to reproduce 
> the error using a simple code snippet, as the error occurs randomly.
>  
> This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
> but I'm still experiencing such behavior.
>   
> Stacktrace:
>  
> {code:java}
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>     at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
>     at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>  ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>  ~[flink-core-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
> [flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10714) java.lang.IndexOutOfBoundsException when creating a heap backend snapshot

2018-10-29 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter updated FLINK-10714:
---
Priority: Blocker  (was: Major)

> java.lang.IndexOutOfBoundsException when creating a heap backend snapshot
> -
>
> Key: FLINK-10714
> URL: https://issues.apache.org/jira/browse/FLINK-10714
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.5, 1.6.2
> Environment: Flink 1.6.2, FsStateBackend
>Reporter: Michał Ciesielczyk
>Priority: Blocker
> Fix For: 1.7.0
>
>
> I'm sometimes getting error while creating a checkpointing using a filesystem 
> state backend. This ONLY happens when asynchronous snapshots are enabled 
> using the FileSystem State Backend. When RocksDB is enabled everything works 
> fine.
>  
> I'm using a simple KeyedStream,mapWithState function with a ValueState 
> holding a  hashmap (scala.collection.immutable.Map). It's hard to reproduce 
> the error using a simple code snippet, as the error occurs randomly.
>  
> This issue may be similar to FLINK-7484 and FLINK-8836 (both already fixed), 
> but I'm still experiencing such behavior.
>   
> Stacktrace:
>  
> {code:java}
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>     at java.util.ArrayList.rangeCheck(ArrayList.java:657) ~[?:1.8.0_172]
>     at java.util.ArrayList.set(ArrayList.java:448) ~[?:1.8.0_172]
>     at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56)
>  ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) 
> ~[kryo-shaded-4.0.0.jar:?]
>     at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
>  ~[flink-core-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) 
> ~[scala-library-2.11.12.jar:?]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>  ~[flink-scala_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:73)
>  ~[flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>  ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) 
> [flink-runtime_2.11-1.6.1.jar:1.6.1]
>     at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10691) StreamSQL E2E test relies on hadoop

2018-10-29 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667370#comment-16667370
 ] 

Hequn Cheng commented on FLINK-10691:
-

Would it be ok if we replace {{BucketingSink}} with {{StreamingFileSink}}? 
It doesn't require hadoop and there is already a BucketSink E2E test so we 
don't have to test Bucket sink in StreamSQL E2E test.

> StreamSQL E2E test relies on hadoop
> ---
>
> Key: FLINK-10691
> URL: https://issues.apache.org/jira/browse/FLINK-10691
> Project: Flink
>  Issue Type: Improvement
>  Components: E2E Tests, Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{flink-stream-sql-test}} makes use of the {{BucketingSink}} and thus 
> requires hadoop to be available. This means wecan't run this test in 
> hadoop-free environments, which is a bit unfortunate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667367#comment-16667367
 ] 

ASF GitHub Bot commented on FLINK-10687:


twalthr commented on issue #6958: [FLINK-10687] [table] Make flink-formats 
Scala-free
URL: https://github.com/apache/flink/pull/6958#issuecomment-433960589
 
 
   Thanks for the review @aljoscha. I added more tests and improvements to the 
type parser.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make flink-formats Scala-free
> -
>
> Key: FLINK-10687
> URL: https://issues.apache.org/jira/browse/FLINK-10687
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> {{flink-table}} is the only dependency that pulls in Scala for 
> {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} 
> Scala-free using only a dependency to {{flink-table-common}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6958: [FLINK-10687] [table] Make flink-formats Scala-free

2018-10-29 Thread GitBox
twalthr commented on issue #6958: [FLINK-10687] [table] Make flink-formats 
Scala-free
URL: https://github.com/apache/flink/pull/6958#issuecomment-433960589
 
 
   Thanks for the review @aljoscha. I added more tests and improvements to the 
type parser.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-29 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9635:
--
Labels: pull-request-available  (was: )

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667366#comment-16667366
 ] 

ASF GitHub Bot commented on FLINK-9635:
---

StefanRRichter opened a new pull request #6961: [FLINK-9635] Fix scheduling for 
local recovery
URL: https://github.com/apache/flink/pull/6961
 
 
   ## What is the purpose of the change
   
   This change fixes the task spread-out problem in scheduling with local 
recovery. The solution is based on creating a global set of all previous 
allocation ids as blacklist to avoid, but all other allocation ids are now free 
to take again.
   
   ## Brief change log
   
   This PR contains a subset of the changes from #6898 and focuses almost 
purely on the fix of local recovery.
   
   ## Verifying this change
   
   Improved `SchedulingITCase` should work now, including previously ignored 
tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter opened a new pull request #6961: [FLINK-9635] Fix scheduling for local recovery

2018-10-29 Thread GitBox
StefanRRichter opened a new pull request #6961: [FLINK-9635] Fix scheduling for 
local recovery
URL: https://github.com/apache/flink/pull/6961
 
 
   ## What is the purpose of the change
   
   This change fixes the task spread-out problem in scheduling with local 
recovery. The solution is based on creating a global set of all previous 
allocation ids as blacklist to avoid, but all other allocation ids are now free 
to take again.
   
   ## Brief change log
   
   This PR contains a subset of the changes from #6898 and focuses almost 
purely on the fix of local recovery.
   
   ## Verifying this change
   
   Improved `SchedulingITCase` should work now, including previously ignored 
tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10691) StreamSQL E2E test relies on hadoop

2018-10-29 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-10691:
---

Assignee: Hequn Cheng

> StreamSQL E2E test relies on hadoop
> ---
>
> Key: FLINK-10691
> URL: https://issues.apache.org/jira/browse/FLINK-10691
> Project: Flink
>  Issue Type: Improvement
>  Components: E2E Tests, Table API  SQL
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.7.0
>
>
> The {{flink-stream-sql-test}} makes use of the {{BucketingSink}} and thus 
> requires hadoop to be available. This means wecan't run this test in 
> hadoop-free environments, which is a bit unfortunate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667357#comment-16667357
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228937420
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ##
 @@ -115,4 +115,12 @@
.defaultValue(1024)
.withDescription("The minimum size of state data files. 
All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata 
file.");
+
+   /**
+* The thread numbers used to download files from DFS in 
RocksDBStateBackend.
+*/
+   public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM 
= ConfigOptions
+   .key("state.checkpoint.restore.thread.num")
+   .defaultValue(1)
+   .withDescription("The thread numbers used to download files 
from DFS in RocksDBStateBackend.");
 
 Review comment:
   also here: `The thread number` without `s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667360#comment-16667360
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228970266
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
resotreThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
resotreThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   final ExecutorService executorService = 
Executors.newFixedThreadPool(resotreThreadNum);
+   List> futures = new LinkedList<>();
+   List closeables = new LinkedList<>();
+
+   try {
+   closeables.add(() -> executorService.shutdownNow());
 
 Review comment:
   I think the tasks should be first properly canceled and then 
`executorService` should shutdown.
   `executorService.shutdownNow` also calls `future.cancel` which is 
interrupting the task thread.
   At the moment `while (true)` in `copyStateDataHandleData` does not support 
explicit cancelation.
   I suggest to create an `running = AtomicBoolean(true)` in the beginning then 
`while (true)` can be `while(running.get())`. This will allow to cancel all 
tasks at once and shutdown `executorService` in one `closeable`:
   `closeables.add(() -> { running.set(false); executorService.shutdownNow()); 
};`
   List of `closeables` is not needed then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667354#comment-16667354
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228937223
 
 

 ##
 File path: docs/_includes/generated/checkpointing_configuration.html
 ##
 @@ -32,6 +32,11 @@
 false
 
 
+
+state.checkpoint.restore.thread.num
+1
+The thread numbers used to download files from DFS in 
RocksDBStateBackend.
 
 Review comment:
   I think `The thread number` should be singular without `s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667359#comment-16667359
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228970609
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
resotreThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
resotreThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   final ExecutorService executorService = 
Executors.newFixedThreadPool(resotreThreadNum);
+   List> futures = new LinkedList<>();
+   List closeables = new LinkedList<>();
+
+   try {
+   closeables.add(() -> executorService.shutdownNow());
+   
closeableRegistry.registerCloseable(((LinkedList) 
closeables).getLast());
+
+   for (Map.Entry entry 
: stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = 
entry.getValue();
+
+   if (resotreThreadNum > 1) {
+   CompletableFuture future = 
CompletableFuture.runAsync(() -> {
+   try {
+   
copyStateDataHandleData(new Path(restoreInstancePath, 
stateHandleID.toString()), remoteFileHandle, closeableRegistry);
+   } catch (IOException e) {
 
 Review comment:
   the exception handling can go inside `copyStateDataHandleData`.
   `new Path(restoreInstancePath, stateHandleID.toString())` can be a variable 
to reduce the line length.
   I would create a list of `Runnable`'s:
   ```
   Path path = new Path(restoreInstancePath, stateHandleID.toString());
   runnables.add( -> 

[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667355#comment-16667355
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228976096
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RocksDbStateDataTransfer.
+ */
+public class RocksDBStateDataTransferTest {
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testThreadPoolExceptionRethrow() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = 
mock(IncrementalKeyedStateHandle.class);
+
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread restore.");
+   StreamStateHandle mockStateHandle = 
mock(StreamStateHandle.class);
+   
when(mockStateHandle.openInputStream()).thenThrow(expectedException);
+
+   Map sharedStateHandle = new 
HashMap<>(1);
+   sharedStateHandle.put(new StateHandleID("mock"), 
mockStateHandle);
+   
when(stateHandle.getSharedState()).thenReturn(sharedStateHandle);
+
+   try {
+   
RocksDbStateDataTransfer.transferAllStateDataToDirectory(stateHandle, new 
Path(temporaryFolder.newFolder().toURI()), 5, new CloseableRegistry());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, 
e.getCause().getCause());
+   }
+   }
+
+   /**
+* Tests that download files with multi-thread correctly.
+* @throws Exception
+*/
+   @Test
+   public void testMultiThreadRestoreCorrectly() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = 
mock(IncrementalKeyedStateHandle.class);
+
+   byte[] content1 = new byte[1];
 
 Review comment:
   I would create a list of something like 6 test contexts and use loops over 
them for further actions.
   content length can be also random.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: 

[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228970609
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
resotreThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
resotreThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   final ExecutorService executorService = 
Executors.newFixedThreadPool(resotreThreadNum);
+   List> futures = new LinkedList<>();
+   List closeables = new LinkedList<>();
+
+   try {
+   closeables.add(() -> executorService.shutdownNow());
+   
closeableRegistry.registerCloseable(((LinkedList) 
closeables).getLast());
+
+   for (Map.Entry entry 
: stateHandleMap.entrySet()) {
+   StateHandleID stateHandleID = entry.getKey();
+   StreamStateHandle remoteFileHandle = 
entry.getValue();
+
+   if (resotreThreadNum > 1) {
+   CompletableFuture future = 
CompletableFuture.runAsync(() -> {
+   try {
+   
copyStateDataHandleData(new Path(restoreInstancePath, 
stateHandleID.toString()), remoteFileHandle, closeableRegistry);
+   } catch (IOException e) {
 
 Review comment:
   the exception handling can go inside `copyStateDataHandleData`.
   `new Path(restoreInstancePath, stateHandleID.toString())` can be a variable 
to reduce the line length.
   I would create a list of `Runnable`'s:
   ```
   Path path = new Path(restoreInstancePath, stateHandleID.toString());
   runnables.add( -> copyStateDataHandleData(path, remoteFileHandle, 
closeableRegistry));
   ```
   and then have separate code for `resotreThreadNum > 1` and `resotreThreadNum 
== 1`.
   `executorService` currently is not used for `resotreThreadNum == 1` but 
created.


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228970266
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   transferAllDataFromStateHandles(sstFiles, dest, 
resotreThreadNum, closeableRegistry);
+   transferAllDataFromStateHandles(miscFiles, dest, 
resotreThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void transferAllDataFromStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int resotreThreadNum,
+   CloseableRegistry closeableRegistry
+   ) throws Exception {
+
+   final ExecutorService executorService = 
Executors.newFixedThreadPool(resotreThreadNum);
+   List> futures = new LinkedList<>();
+   List closeables = new LinkedList<>();
+
+   try {
+   closeables.add(() -> executorService.shutdownNow());
 
 Review comment:
   I think the tasks should be first properly canceled and then 
`executorService` should shutdown.
   `executorService.shutdownNow` also calls `future.cancel` which is 
interrupting the task thread.
   At the moment `while (true)` in `copyStateDataHandleData` does not support 
explicit cancelation.
   I suggest to create an `running = AtomicBoolean(true)` in the beginning then 
`while (true)` can be `while(running.get())`. This will allow to cancel all 
tasks at once and shutdown `executorService` in one `closeable`:
   `closeables.add(() -> { running.set(false); executorService.shutdownNow()); 
};`
   List of `closeables` is not needed then.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667356#comment-16667356
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228964472
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -235,9 +238,10 @@ public RocksDBStateBackend(StateBackend 
checkpointStreamBackend) {
 * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
 * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
 */
-   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
+   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing, int restoreThreadNum) {
 
 Review comment:
   I would also leave the previous method without restoreThreadNum:
   ```
   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
   this(checkpointStreamBackend, enableIncrementalCheckpointing, -1);
   }
   ```
   this way we do not break existing code, including other methods here, like 
this modification which is not needed then and other similar ones:
   ```
   public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
this(checkpointStreamBackend, TernaryBoolean.UNDEFINED, 1);
}
   ```
   
   and I would rather keep it undefined here, e.g. `-1`, similar to 
`TernaryBoolean.UNDEFINED` for `enableIncrementalCheckpointing`. It can be 
resolved then in `private RocksDBStateBackend(RocksDBStateBackend original, 
Configuration config)` and `getRestoreThreadNum`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667358#comment-16667358
 ] 

ASF GitHub Bot commented on FLINK-10461:


azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228967531
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
 
 Review comment:
   typo: `resotreThreadNum` -> `restoredThreadNum`, also in other methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228937223
 
 

 ##
 File path: docs/_includes/generated/checkpointing_configuration.html
 ##
 @@ -32,6 +32,11 @@
 false
 
 
+
+state.checkpoint.restore.thread.num
+1
+The thread numbers used to download files from DFS in 
RocksDBStateBackend.
 
 Review comment:
   I think `The thread number` should be singular without `s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228964472
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -235,9 +238,10 @@ public RocksDBStateBackend(StateBackend 
checkpointStreamBackend) {
 * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
 * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
 */
-   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
+   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing, int restoreThreadNum) {
 
 Review comment:
   I would also leave the previous method without restoreThreadNum:
   ```
   public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
TernaryBoolean enableIncrementalCheckpointing) {
   this(checkpointStreamBackend, enableIncrementalCheckpointing, -1);
   }
   ```
   this way we do not break existing code, including other methods here, like 
this modification which is not needed then and other similar ones:
   ```
   public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
this(checkpointStreamBackend, TernaryBoolean.UNDEFINED, 1);
}
   ```
   
   and I would rather keep it undefined here, e.g. `-1`, similar to 
`TernaryBoolean.UNDEFINED` for `enableIncrementalCheckpointing`. It can be 
resolved then in `private RocksDBStateBackend(RocksDBStateBackend original, 
Configuration config)` and `getRestoreThreadNum`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228937420
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ##
 @@ -115,4 +115,12 @@
.defaultValue(1024)
.withDescription("The minimum size of state data files. 
All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata 
file.");
+
+   /**
+* The thread numbers used to download files from DFS in 
RocksDBStateBackend.
+*/
+   public static final ConfigOption CHECKPOINT_RESTORE_THREAD_NUM 
= ConfigOptions
+   .key("state.checkpoint.restore.thread.num")
+   .defaultValue(1)
+   .withDescription("The thread numbers used to download files 
from DFS in RocksDBStateBackend.");
 
 Review comment:
   also here: `The thread number` without `s`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228967531
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+public class RocksDbStateDataTransfer {
+
+   public static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int resotreThreadNum,
 
 Review comment:
   typo: `resotreThreadNum` -> `restoredThreadNum`, also in other methods


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r228976096
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransferTest.java
 ##
 @@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for RocksDbStateDataTransfer.
+ */
+public class RocksDBStateDataTransferTest {
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Test that the exception arose in the thread pool will rethrow to the 
main thread.
+*/
+   @Test
+   public void testThreadPoolExceptionRethrow() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = 
mock(IncrementalKeyedStateHandle.class);
+
+   SpecifiedException expectedException = new 
SpecifiedException("throw exception while multi thread restore.");
+   StreamStateHandle mockStateHandle = 
mock(StreamStateHandle.class);
+   
when(mockStateHandle.openInputStream()).thenThrow(expectedException);
+
+   Map sharedStateHandle = new 
HashMap<>(1);
+   sharedStateHandle.put(new StateHandleID("mock"), 
mockStateHandle);
+   
when(stateHandle.getSharedState()).thenReturn(sharedStateHandle);
+
+   try {
+   
RocksDbStateDataTransfer.transferAllStateDataToDirectory(stateHandle, new 
Path(temporaryFolder.newFolder().toURI()), 5, new CloseableRegistry());
+   fail();
+   } catch (Exception e) {
+   assertEquals(expectedException, 
e.getCause().getCause());
+   }
+   }
+
+   /**
+* Tests that download files with multi-thread correctly.
+* @throws Exception
+*/
+   @Test
+   public void testMultiThreadRestoreCorrectly() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = 
mock(IncrementalKeyedStateHandle.class);
+
+   byte[] content1 = new byte[1];
 
 Review comment:
   I would create a list of something like 6 test contexts and use loops over 
them for further actions.
   content length can be also random.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-10-29 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667345#comment-16667345
 ] 

Timo Walther commented on FLINK-10704:
--

I thought maybe Maven resource transformers could do it but apparently no. If 
there is no way of relocating it, I guess we need to add it to the SQL jar 
which requires an exclusion in the jar checking logic that we added to the SQL 
Client end-to-end test.

Why are we actually checking for every "error" (case insensitive) string? 
Actually, this is only a warning. Shouldn't we check for "ERROR" instead?

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9697) Provide connector for modern Kafka

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667342#comment-16667342
 ] 

ASF GitHub Bot commented on FLINK-9697:
---

pnowojski commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-433950958
 
 
   One more issue question. Why the new connector has this exclussion:
   ```

org.apache.flink

flink-connector-kafka-base_${scala.binary.version}
${project.version}


org.apache.kafka

kafka_${scala.binary.version}



   ```
   Shouldn't the `flink-connector-kafka-base` be Kafka independent?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide connector for modern Kafka
> --
>
> Key: FLINK-9697
> URL: https://issues.apache.org/jira/browse/FLINK-9697
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Kafka 2.0.0 would be released soon.
> Here is vote thread:
> [http://search-hadoop.com/m/Kafka/uyzND1vxnEd23QLxb?subj=+VOTE+2+0+0+RC1]
> We should provide connector for Kafka 2.0.0 once it is released.
> Upgrade to 2.0 documentation : 
> http://kafka.apache.org/20/documentation.html#upgrade_2_0_0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on issue #6703: [FLINK-9697] Provide connector for modern Kafka

2018-10-29 Thread GitBox
pnowojski commented on issue #6703: [FLINK-9697] Provide connector for modern 
Kafka
URL: https://github.com/apache/flink/pull/6703#issuecomment-433950958
 
 
   One more issue question. Why the new connector has this exclussion:
   ```

org.apache.flink

flink-connector-kafka-base_${scala.binary.version}
${project.version}


org.apache.kafka

kafka_${scala.binary.version}



   ```
   Shouldn't the `flink-connector-kafka-base` be Kafka independent?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10704) Fix sql client end to end test failure

2018-10-29 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667332#comment-16667332
 ] 

Piotr Nowojski commented on FLINK-10704:


I asked [~twalthr] about that (he added it), and there is no particular reason, 
except of that it would probably collide in our connectors that pull in 
multiple Kafka versions ( which is an issue on it's own ).

Maybe we can relocate the resource file and rename access patterns to it 
somehow? I don't know if that's possible. Quick google search didn't bring me 
answers. [~twalthr] do you have any ideas?

> Fix sql client end to end test failure
> --
>
> Key: FLINK-10704
> URL: https://issues.apache.org/jira/browse/FLINK-10704
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> The log file contains the following sentence:
> {code:java}
> 2018-10-29 03:27:39,209 WARN 
> org.apache.flink.kafka010.shaded.org.apache.kafka.common.utils.AppInfoParser 
> - Error while loading kafka-version.properties :null
> {code}
> The reason for this log is that we explicitly exclude the version description 
> file of the kafka client when packaging the connector:
> {code:java}
> 
>
>   *:*
>   
>  kafka/kafka-version.properties
>   
>
> {code}
> When the shell scan the "error" keyword with grep, it will hit, so the test 
> will fail.
> {code:java}
> function check_logs_for_errors {
>   error_count=$(grep -rv "GroupCoordinatorNotAvailableException" 
> $FLINK_DIR/log \
>   | grep -v "RetriableCommitFailedException" \
>   | grep -v "NoAvailableBrokersException" \
>   | grep -v "Async Kafka commit failed" \
>   | grep -v "DisconnectException" \
>   | grep -v "AskTimeoutException" \
>   | grep -v "WARN  akka.remote.transport.netty.NettyTransport" \
>   | grep -v  "WARN  
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
>   | grep -v "jvm-exit-on-fatal-error" \
>   | grep -v '^INFO:.*AWSErrorCode=\[400 Bad 
> Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]'
>  \
>   | grep -v "RejectedExecutionException" \
>   | grep -v "An exception was thrown by an exception handler" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/yarn/exceptions/YarnException" \
>   | grep -v "java.lang.NoClassDefFoundError: 
> org/apache/hadoop/conf/Configuration" \
>   | grep -v 
> "org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector
>   - Error when creating PropertyDescriptor for public final void 
> org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)!
>  Ignoring this property." \
>   | grep -ic "error")//here
>   if [[ ${error_count} -gt 0 ]]; then
> echo "Found error in log files:"
> cat $FLINK_DIR/log/*
> EXIT_CODE=1
>   fi
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-29 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9635:
-
Affects Version/s: 1.6.2

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.2
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9635) Local recovery scheduling can cause spread out of tasks

2018-10-29 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-9635:


Assignee: Stefan Richter

> Local recovery scheduling can cause spread out of tasks
> ---
>
> Key: FLINK-9635
> URL: https://issues.apache.org/jira/browse/FLINK-9635
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Stefan Richter
>Priority: Critical
> Fix For: 1.7.0
>
>
> In order to make local recovery work, Flink's scheduling was changed such 
> that it tries to be rescheduled to its previous location. In order to not 
> occupy slots which have state of other tasks cached, the strategy will 
> request a new slot if the old slot identified by the previous allocation id 
> is no longer present. This also applies to newly allocated slots because 
> there is no distinction between new or already used. This behaviour can cause 
> that every tasks gets deployed to its own slot if the {{SlotPool}} has 
> released all slots in the meantime, for example. The consequence could be 
> that a job can no longer be executed after a failure because it needs more 
> slots than before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10713) RestartIndividualStrategy does not restore state

2018-10-29 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10713:
--
Fix Version/s: 1.7.0

> RestartIndividualStrategy does not restore state
> 
>
> Key: FLINK-10713
> URL: https://issues.apache.org/jira/browse/FLINK-10713
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.7.0
>
>
> RestartIndividualStrategy does not perform any state restore. This is big 
> problem because all restored regions will be restarted with empty state. We 
> need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10712) RestartPipelinedRegionStrategy does not restore state

2018-10-29 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10712:
--
Fix Version/s: 1.7.0

> RestartPipelinedRegionStrategy does not restore state
> -
>
> Key: FLINK-10712
> URL: https://issues.apache.org/jira/browse/FLINK-10712
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.7.0
>
>
> RestartPipelinedRegionStrategy does not perform any state restore. This is 
> big problem because all restored regions will be restarted with empty state. 
> We need to take checkpoints into account when restoring.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10715) E2e tests fail with ConcurrentModificationException in MetricRegistryImpl

2018-10-29 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667296#comment-16667296
 ] 

Chesnay Schepler commented on FLINK-10715:
--

This is effectively a duplicate of FLINK-10035.

> E2e tests fail with ConcurrentModificationException in MetricRegistryImpl
> -
>
> Key: FLINK-10715
> URL: https://issues.apache.org/jira/browse/FLINK-10715
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests, Metrics
>Affects Versions: 1.7.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
>
> Couple of e2e tests that rely on metrics fail with exception:
> {code}
> 2018-10-29 11:40:32,781 WARN  
> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while 
> reporting metrics
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>   at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>   at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:101)
>   at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> Tests that failed:
> *  'Resuming Externalized Checkpoint (file, sync, no parallelism change) 
> end-to-end test
> * 'State TTL Heap backend end-to-end test'



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10718) Use IO executor in RpcService for message serialization

2018-10-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-10718:
-

 Summary: Use IO executor in RpcService for message serialization
 Key: FLINK-10718
 URL: https://issues.apache.org/jira/browse/FLINK-10718
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.6.2, 1.5.5, 1.7.0
Reporter: Till Rohrmann
 Fix For: 1.7.0


The {{AkkaInvocationHandler}} and once FLINK-10251 has been merged also the 
{{AkkaRpcActor}} serialize their remote RPC messages before sending them. This 
happens in the calling thread which can be a main thread of another 
{{RpcEndpoint}}. Depending on the de-/serialization time, this can be 
considered a blocking operation. Thus, I propose to introduce an IO executor 
which is being used for the serialization of the messages. This will make the 
{{RpcService}} abstraction more scalable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10717) Introduce SimpleSerializerSnapshot as replacement for ParameterlessTypeSerializerConfig

2018-10-29 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-10717:
---

 Summary: Introduce SimpleSerializerSnapshot as replacement for 
ParameterlessTypeSerializerConfig
 Key: FLINK-10717
 URL: https://issues.apache.org/jira/browse/FLINK-10717
 Project: Flink
  Issue Type: Sub-task
  Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
Assignee: Stephan Ewen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667290#comment-16667290
 ] 

ASF GitHub Bot commented on FLINK-10600:


pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228949560
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -538,6 +550,11 @@ under the License.
shade


+   
+   
+   
org.apache.flink:flink-connector-kafka_${scala.binary.version}
 
 Review comment:
   Ok, right. We can only exclude the modern one and `0.11`? And we can still 
exclude everything from "modern" `KafkaExample`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread GitBox
pnowojski commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228949560
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -538,6 +550,11 @@ under the License.
shade


+   
+   
+   
org.apache.flink:flink-connector-kafka_${scala.binary.version}
 
 Review comment:
   Ok, right. We can only exclude the modern one and `0.11`? And we can still 
exclude everything from "modern" `KafkaExample`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9552) NPE in SpanningRecordSerializer during checkpoint with iterations

2018-10-29 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9552:
-
Fix Version/s: 1.7.0

> NPE in SpanningRecordSerializer during checkpoint with iterations
> -
>
> Key: FLINK-9552
> URL: https://issues.apache.org/jira/browse/FLINK-9552
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.7.0
>
>
> We're encountering NPE intermittently inside SpanningRecordSerializer during 
> checkpoint.
>  
> {code:java}
> 2018-06-08 08:31:35,741 [ka.actor.default-dispatcher-83] INFO  
> o.a.f.r.e.ExecutionGraph IterationSource-22 (44/120) 
> (c1b94ef849db0e5fb9fb7b85c17073ce) switched from RUNNING to FAILED.
> java.lang.RuntimeException
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>   ... 5 more
> 2018-06-08 08:31:35,746 [ka.actor.default-dispatcher-83] INFO  
> o.a.f.r.e.ExecutionGraph Job xxx (8a4eaf02c46dc21c7d6f3f70657dbb17) switched 
> from state RUNNING to FAILING.
> java.lang.RuntimeException
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamIterationHead.run(StreamIterationHead.java:91)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:98)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
>   at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>   ... 5 more
> {code}
> This issue is probably concurrency related, because the revelant Flink code 
> seems to have proper null checking 
> https://github.com/apache/flink/blob/fa024726bb801fc71cec5cc303cac1d4a03f555e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java#L98
> {code:java}
> // Copy from intermediate buffers to current target memory segment
> if (targetBuffer != null) {
> targetBuffer.append(lengthBuffer);
> targetBuffer.append(dataBuffer);
> targetBuffer.commit();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10603) Reduce kafka test duration

2018-10-29 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667287#comment-16667287
 ] 

Piotr Nowojski commented on FLINK-10603:


I think the single largest overhead comes from This run time overhead comes 
from this code:
{code:java}
// this means that this is either:
// (1) the first execution of this application
// (2) previous execution has failed before 
first checkpoint completed
//
// in case of (2) we have to abort all previous 
transactions

abortTransactions(transactionalIdsGenerator.generateIdsToAbort());
{code}

in 
{{org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011#initializeState}}
 

Commenting this line out single handedly speeds up the tests by 30-40%. Of 
course we can not do that on the production, but maybe we could introduce 
private/internal switch {{assumeFirstExecution}} or 
{{forceNotAbortOldTransactions}} that we could use in all of the tests except 
of the one test that is checking for this exact feature 
{{FlinkKafkaProducer011ITCase#testScaleDownBeforeFirstCheckpoint}}.

But this wouldn't be a healthy "hack"...

Besides that one would have to time profile the most time consuming tests more 
and try to figure out if we can speed them up.

> Reduce kafka test duration
> --
>
> Key: FLINK-10603
> URL: https://issues.apache.org/jira/browse/FLINK-10603
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Affects Versions: 1.7.0
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> The tests for the modern kafka connector take more than 10 minutes which is 
> simply unacceptable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667248#comment-16667248
 ] 

ASF GitHub Bot commented on FLINK-10097:


kl0u commented on issue #6520: [FLINK-10097][DataStream API] Additional tests 
for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#issuecomment-433920446
 
 
   Thanks a lot  @azagrebin ! I will integrate your comments and merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u commented on issue #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink

2018-10-29 Thread GitBox
kl0u commented on issue #6520: [FLINK-10097][DataStream API] Additional tests 
for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#issuecomment-433920446
 
 
   Thanks a lot  @azagrebin ! I will integrate your comments and merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10668) Streaming File Sink E2E test fails because not all legitimate exceptions are excluded

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667238#comment-16667238
 ] 

ASF GitHub Bot commented on FLINK-10668:


GJL commented on issue #6950: [FLINK-10668][e2e] Streaming File Sink E2E test 
fails because not all legitimate exceptions are excluded
URL: https://github.com/apache/flink/pull/6950#issuecomment-433917808
 
 
   I think this is relevant: https://github.com/apache/flink/pull/6959


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streaming File Sink E2E test fails because not all legitimate exceptions are 
> excluded
> -
>
> Key: FLINK-10668
> URL: https://issues.apache.org/jira/browse/FLINK-10668
> Project: Flink
>  Issue Type: Bug
>  Components: E2E Tests
>Affects Versions: 1.6.1, 1.7.0
>Reporter: Gary Yao
>Assignee: Hequn Cheng
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0
>
>
> Streaming File Sink E2E test fails because not all legitimate exceptions are 
> excluded.
> The stacktrace below can appear in the logs generated by the test but 
> {{check_logs_for_exceptions}} does not exclude all expected exceptions.
> {noformat}
> java.io.IOException: Connecting the channel failed: Connecting to remote task 
> manager + 'xxx/10.0.x.xx:50849' has failed. This might indicate that the 
> remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:133)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:85)
>   at 
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:166)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:494)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:525)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:165)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connecting to remote task manager + 'xxx/10.0.x.xx:50849' has failed. 
> This might indicate that the remote task manager has been lost.
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:219)
>   at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:133)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>   at 
> 

[GitHub] GJL commented on issue #6950: [FLINK-10668][e2e] Streaming File Sink E2E test fails because not all legitimate exceptions are excluded

2018-10-29 Thread GitBox
GJL commented on issue #6950: [FLINK-10668][e2e] Streaming File Sink E2E test 
fails because not all legitimate exceptions are excluded
URL: https://github.com/apache/flink/pull/6950#issuecomment-433917808
 
 
   I think this is relevant: https://github.com/apache/flink/pull/6959


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667226#comment-16667226
 ] 

ASF GitHub Bot commented on FLINK-10097:


azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream 
API] Additional tests for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#discussion_r228928607
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 ##
 @@ -50,233 +43,234 @@
@Test
public void testDefaultRollingPolicy() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
+   final Path path = new Path(outDir.toURI());
 
-   final RollingPolicy, String> 
rollingPolicy = DefaultRollingPolicy
-   .create()
-   .withMaxPartSize(10L)
-   .withInactivityInterval(4L)
-   .withRolloverInterval(11L)
-   .build();
-
-   try (
-   
OneInputStreamOperatorTestHarness, Object> testHarness 
= TestUtils.createCustomRescalingTestSink(
-   outDir,
-   1,
-   0,
-   1L,
-   new 
TestUtils.TupleToStringBucketer(),
-   new SimpleStringEncoder<>(),
-   rollingPolicy,
-   new 
DefaultBucketFactoryImpl<>())
-   ) {
-   testHarness.setup();
-   testHarness.open();
-
-   testHarness.setProcessingTime(0L);
+   final RollingPolicy originalRollingPolicy =
+   DefaultRollingPolicy
+   .create()
+   .withMaxPartSize(10L)
+   .withInactivityInterval(4L)
+   .withRolloverInterval(11L)
+   .build();
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
-   TestUtils.checkLocalFs(outDir, 1, 0);
+   final MethodCallCountingPolicyWrapper 
rollingPolicy =
+   new 
MethodCallCountingPolicyWrapper<>(originalRollingPolicy);
 
-   // roll due to size
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
-   TestUtils.checkLocalFs(outDir, 1, 0);
+   final Buckets buckets = createBuckets(path, 
rollingPolicy);
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 3L));
-   TestUtils.checkLocalFs(outDir, 2, 0);
+   rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
 
-   // roll due to inactivity
-   testHarness.setProcessingTime(7L);
+   // these two will fill up the first in-progress file and at the 
third it will roll ...
+   buckets.onElement("test1", new TestUtils.MockSinkContext(1L, 
1L, 1L));
+   buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 
1L, 2L));
+   rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 4), 4L));
-   TestUtils.checkLocalFs(outDir, 3, 0);
+   buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 
1L, 3L));
+   rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
 
-   // roll due to rollover interval
-   testHarness.setProcessingTime(20L);
+   // still no time to roll
+   buckets.onProcessingTime(5L);
+   rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 1L, 0L);
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
-   TestUtils.checkLocalFs(outDir, 4, 0);
+   // roll due to inactivity
+   buckets.onProcessingTime(7L);
+   rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 2L, 1L);
 
-   // we take a checkpoint but we should not roll.
-   testHarness.snapshot(1L, 1L);
+   buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 
1L, 3L));
 
-   TestUtils.checkLocalFs(outDir, 4, 0);
+   

[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667224#comment-16667224
 ] 

ASF GitHub Bot commented on FLINK-10097:


azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream 
API] Additional tests for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#discussion_r228873834
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
 ##
 @@ -40,54 +45,288 @@
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
@Test
-   public void testContextPassingNormalExecution() throws Exception {
-   testCorrectPassingOfContext(1L, 2L, 3L);
+   public void testSnapshotAndRestore() throws Exception {
+   final File outDir = TEMP_FOLDER.newFolder();
+   final Path path = new Path(outDir.toURI());
+
+   final RollingPolicy onCheckpointRP = 
OnCheckpointRollingPolicy.build();
+
+   final Buckets buckets = createBuckets(path, 
onCheckpointRP, 0);
+
+   final ListState bucketStateContainer = new 
MockListState<>();
+   final ListState partCounterContainer = new 
MockListState<>();
+
+   buckets.onElement("test1", new TestUtils.MockSinkContext(null, 
1L, 2L));
+   buckets.snapshotState(0L, bucketStateContainer, 
partCounterContainer);
+
+   buckets.onElement("test2", new TestUtils.MockSinkContext(null, 
1L, 2L));
+   buckets.snapshotState(1L, bucketStateContainer, 
partCounterContainer);
+
+   Buckets restoredBuckets =
+   restoreBuckets(path, onCheckpointRP, 0, 
bucketStateContainer, partCounterContainer);
+
+   final Map> activeBuckets = 
restoredBuckets.getActiveBuckets();
+   Assert.assertEquals(2L, activeBuckets.size());
+   Assert.assertTrue(activeBuckets.keySet().contains("test1"));
 
 Review comment:
   as I understand, the check logic for test1/2 is the same, it could be 
deduped in helper method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10097) More tests to increase StreamingFileSink test coverage

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667225#comment-16667225
 ] 

ASF GitHub Bot commented on FLINK-10097:


azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream 
API] Additional tests for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#discussion_r228920095
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 ##
 @@ -484,131 +479,4 @@ public void testScalingDownAndMergingOfStates() throws 
Exception {
Assert.assertEquals(3L, counter);
}
}
-
-   @Test
-   public void testMaxCounterUponRecovery() throws Exception {
 
 Review comment:
   Is this test not relevant anymore?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> More tests to increase StreamingFileSink test coverage
> --
>
> Key: FLINK-10097
> URL: https://issues.apache.org/jira/browse/FLINK-10097
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.3
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream 
API] Additional tests for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#discussion_r228873834
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
 ##
 @@ -40,54 +45,288 @@
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
 
@Test
-   public void testContextPassingNormalExecution() throws Exception {
-   testCorrectPassingOfContext(1L, 2L, 3L);
+   public void testSnapshotAndRestore() throws Exception {
+   final File outDir = TEMP_FOLDER.newFolder();
+   final Path path = new Path(outDir.toURI());
+
+   final RollingPolicy onCheckpointRP = 
OnCheckpointRollingPolicy.build();
+
+   final Buckets buckets = createBuckets(path, 
onCheckpointRP, 0);
+
+   final ListState bucketStateContainer = new 
MockListState<>();
+   final ListState partCounterContainer = new 
MockListState<>();
+
+   buckets.onElement("test1", new TestUtils.MockSinkContext(null, 
1L, 2L));
+   buckets.snapshotState(0L, bucketStateContainer, 
partCounterContainer);
+
+   buckets.onElement("test2", new TestUtils.MockSinkContext(null, 
1L, 2L));
+   buckets.snapshotState(1L, bucketStateContainer, 
partCounterContainer);
+
+   Buckets restoredBuckets =
+   restoreBuckets(path, onCheckpointRP, 0, 
bucketStateContainer, partCounterContainer);
+
+   final Map> activeBuckets = 
restoredBuckets.getActiveBuckets();
+   Assert.assertEquals(2L, activeBuckets.size());
+   Assert.assertTrue(activeBuckets.keySet().contains("test1"));
 
 Review comment:
   as I understand, the check logic for test1/2 is the same, it could be 
deduped in helper method


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream 
API] Additional tests for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#discussion_r228928607
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 ##
 @@ -50,233 +43,234 @@
@Test
public void testDefaultRollingPolicy() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
+   final Path path = new Path(outDir.toURI());
 
-   final RollingPolicy, String> 
rollingPolicy = DefaultRollingPolicy
-   .create()
-   .withMaxPartSize(10L)
-   .withInactivityInterval(4L)
-   .withRolloverInterval(11L)
-   .build();
-
-   try (
-   
OneInputStreamOperatorTestHarness, Object> testHarness 
= TestUtils.createCustomRescalingTestSink(
-   outDir,
-   1,
-   0,
-   1L,
-   new 
TestUtils.TupleToStringBucketer(),
-   new SimpleStringEncoder<>(),
-   rollingPolicy,
-   new 
DefaultBucketFactoryImpl<>())
-   ) {
-   testHarness.setup();
-   testHarness.open();
-
-   testHarness.setProcessingTime(0L);
+   final RollingPolicy originalRollingPolicy =
+   DefaultRollingPolicy
+   .create()
+   .withMaxPartSize(10L)
+   .withInactivityInterval(4L)
+   .withRolloverInterval(11L)
+   .build();
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 1), 1L));
-   TestUtils.checkLocalFs(outDir, 1, 0);
+   final MethodCallCountingPolicyWrapper 
rollingPolicy =
+   new 
MethodCallCountingPolicyWrapper<>(originalRollingPolicy);
 
-   // roll due to size
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 2), 2L));
-   TestUtils.checkLocalFs(outDir, 1, 0);
+   final Buckets buckets = createBuckets(path, 
rollingPolicy);
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 3), 3L));
-   TestUtils.checkLocalFs(outDir, 2, 0);
+   rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
 
-   // roll due to inactivity
-   testHarness.setProcessingTime(7L);
+   // these two will fill up the first in-progress file and at the 
third it will roll ...
+   buckets.onElement("test1", new TestUtils.MockSinkContext(1L, 
1L, 1L));
+   buckets.onElement("test1", new TestUtils.MockSinkContext(2L, 
1L, 2L));
+   rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 4), 4L));
-   TestUtils.checkLocalFs(outDir, 3, 0);
+   buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 
1L, 3L));
+   rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
 
-   // roll due to rollover interval
-   testHarness.setProcessingTime(20L);
+   // still no time to roll
+   buckets.onProcessingTime(5L);
+   rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 1L, 0L);
 
-   testHarness.processElement(new 
StreamRecord<>(Tuple2.of("test1", 5), 5L));
-   TestUtils.checkLocalFs(outDir, 4, 0);
+   // roll due to inactivity
+   buckets.onProcessingTime(7L);
+   rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 2L, 1L);
 
-   // we take a checkpoint but we should not roll.
-   testHarness.snapshot(1L, 1L);
+   buckets.onElement("test1", new TestUtils.MockSinkContext(3L, 
1L, 3L));
 
-   TestUtils.checkLocalFs(outDir, 4, 0);
+   // roll due to rollover interval
+   buckets.onProcessingTime(20L);
+   rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L);
 
-   // acknowledge the checkpoint, so publish the 3 closed 

[GitHub] azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream API] Additional tests for StreamingFileSink

2018-10-29 Thread GitBox
azagrebin commented on a change in pull request #6520: [FLINK-10097][DataStream 
API] Additional tests for StreamingFileSink
URL: https://github.com/apache/flink/pull/6520#discussion_r228920095
 
 

 ##
 File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/LocalStreamingFileSinkTest.java
 ##
 @@ -484,131 +479,4 @@ public void testScalingDownAndMergingOfStates() throws 
Exception {
Assert.assertEquals(3L, counter);
}
}
-
-   @Test
-   public void testMaxCounterUponRecovery() throws Exception {
 
 Review comment:
   Is this test not relevant anymore?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10716) Upgrade ListSerializer / ArrayListSerializer for state evolution

2018-10-29 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-10716:
---

 Summary: Upgrade ListSerializer / ArrayListSerializer for state 
evolution
 Key: FLINK-10716
 URL: https://issues.apache.org/jira/browse/FLINK-10716
 Project: Flink
  Issue Type: Sub-task
  Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai


Besides upgrading their snapshots, we should also make sure we have a 
corresponding {{ListSerializerMigrationTest}} and 
{{ArrayListSerializerMigrationTest}} to safe guard the upgrade.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667191#comment-16667191
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228923208
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -538,6 +550,11 @@ under the License.
shade


+   
+   
+   
org.apache.flink:flink-connector-kafka_${scala.binary.version}
 
 Review comment:
   We can't exclude these connectors, which is reported in my travis because 
there is a cascading dependency between the previous connectors. If you exclude 
it, you will get an error: 
   
   ```
   java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.streaming.examples.kafka.Kafka010Example.main(Kafka010Example.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 29 more
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on a change in pull request #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread GitBox
yanghua commented on a change in pull request #6924: [FLINK-10600] Provide 
End-to-end test cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#discussion_r228923208
 
 

 ##
 File path: flink-examples/flink-examples-streaming/pom.xml
 ##
 @@ -538,6 +550,11 @@ under the License.
shade


+   
+   
+   
org.apache.flink:flink-connector-kafka_${scala.binary.version}
 
 Review comment:
   We can't exclude these connectors, which is reported in my travis because 
there is a cascading dependency between the previous connectors. If you exclude 
it, you will get an error: 
   
   ```
   java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.streaming.examples.kafka.Kafka010Example.main(Kafka010Example.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 29 more
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667188#comment-16667188
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua edited a comment on issue #6924: [FLINK-10600] Provide End-to-end test 
cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230
 
 
   Let me try to locate it. This code was written by @tzulitai  before.
   
   It seems the call : 
   
   ```
   $(get_partition_end_offset test-input 1)
   ```
   may get a `null`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua edited a comment on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread GitBox
yanghua edited a comment on issue #6924: [FLINK-10600] Provide End-to-end test 
cases for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230
 
 
   Let me try to locate it. This code was written by @tzulitai  before.
   
   It seems the call : 
   
   ```
   $(get_partition_end_offset test-input 1)
   ```
   may get a `null`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10600) Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667183#comment-16667183
 ] 

ASF GitHub Bot commented on FLINK-10600:


yanghua commented on issue #6924: [FLINK-10600] Provide End-to-end test cases 
for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230
 
 
   Let me try to locate it. This code was written by @tzulitai  before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide End-to-end test cases for modern Kafka connectors
> -
>
> Key: FLINK-10600
> URL: https://issues.apache.org/jira/browse/FLINK-10600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8953) Resolve unresolved field references in FieldComputer expressions

2018-10-29 Thread xueyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xueyu reassigned FLINK-8953:


Assignee: xueyu

> Resolve unresolved field references in FieldComputer expressions
> 
>
> Key: FLINK-8953
> URL: https://issues.apache.org/jira/browse/FLINK-8953
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: xueyu
>Priority: Major
>
> When implementing the {{FieldComputer.getExpression}} method, it is not 
> possible to use API classes but only internal expression case classes.
> It would be great to also define timestamp extractors like:
> {code}
>   def getExpression(fieldAccesses: Array[ResolvedFieldReference]): Expression 
> = {
> // 'x.cast(Types.LONG)
> // ExpressionParser.parseExpression("x.cast(LONG)")
>   }
> {code}
> An even better solution would be to provide different `getExpression()` 
> methods that an implementor can override. The general goal should be to 
> define this as natural as possible. In the future we should also support SQL:
> {code}
>   def getJavaExpression(fieldAccesses: Array[ResolvedFieldReference]): String 
> = {
> "x.cast(LONG)"
>   }
>   def getSQLExpression(fieldAccesses: Array[ResolvedFieldReference]): String 
> = {
> "CAST(x AS LONG)"
>   }
> {code}
> The final design is still up for discussion. These are just ideas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6924: [FLINK-10600] Provide End-to-end test cases for modern Kafka connectors

2018-10-29 Thread GitBox
yanghua commented on issue #6924: [FLINK-10600] Provide End-to-end test cases 
for modern Kafka connectors
URL: https://github.com/apache/flink/pull/6924#issuecomment-433904230
 
 
   Let me try to locate it. This code was written by @tzulitai  before.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys commented on issue #6955: [hotfix] [docs] Fix typo in ConfigGroups

2018-10-29 Thread GitBox
dawidwys commented on issue #6955: [hotfix] [docs] Fix typo in ConfigGroups
URL: https://github.com/apache/flink/pull/6955#issuecomment-433901467
 
 
   Hi @linzhaoming thank you for your contribution. Please remember though to 
update the PR description when opening.
   
   Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys closed pull request #6955: [hotfix] [docs] Fix typo in ConfigGroups

2018-10-29 Thread GitBox
dawidwys closed pull request #6955: [hotfix] [docs] Fix typo in ConfigGroups
URL: https://github.com/apache/flink/pull/6955
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
 
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
index 53bf856c0a7..5afe9f9ac41 100644
--- 
a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
+++ 
b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/ConfigGroups.java
@@ -26,7 +26,7 @@
 import java.lang.annotation.Target;
 
 /**
- * Annotation used on classes containing config optionss that enables the 
separation of options into different
+ * Annotation used on classes containing config options that enables the 
separation of options into different
  * tables based on key prefixes. A config option is assigned to a {@link 
ConfigGroup} if the option key matches
  * the group prefix. If a key matches multiple prefixes the longest matching 
prefix takes priority. An option is never
  * assigned to multiple groups. Options that don't match any group are 
implicitly added to a default group.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10687) Make flink-formats Scala-free

2018-10-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667177#comment-16667177
 ] 

ASF GitHub Bot commented on FLINK-10687:


aljoscha commented on issue #6958: [FLINK-10687] [table] Make flink-formats 
Scala-free
URL: https://github.com/apache/flink/pull/6958#issuecomment-433903067
 
 
   I did review everything except the new parser. Are there any tests that test 
more complex things, like nested types, deep nesting, and things?
   
   Otherwise this looks good.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make flink-formats Scala-free
> -
>
> Key: FLINK-10687
> URL: https://issues.apache.org/jira/browse/FLINK-10687
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> {{flink-table}} is the only dependency that pulls in Scala for 
> {{flink-json}}, {{flink-avro}}. We should aim to make {{flink-formats}} 
> Scala-free using only a dependency to {{flink-table-common}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >